github-advanced-security[bot] commented on code in PR #17057:
URL: https://github.com/apache/druid/pull/17057#discussion_r1758522212


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java:
##########
@@ -19,349 +19,285 @@
 
 package org.apache.druid.msq.exec;
 
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.primitives.Ints;
-import com.google.inject.Injector;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
-import org.apache.druid.frame.processor.Bouncer;
-import org.apache.druid.indexing.worker.config.WorkerConfig;
-import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.SuperSorter;
 import org.apache.druid.msq.indexing.error.MSQException;
 import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
 import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
-import org.apache.druid.msq.input.InputSpecs;
-import org.apache.druid.msq.kernel.QueryDefinition;
+import 
org.apache.druid.msq.indexing.processor.KeyStatisticsCollectionProcessor;
+import 
org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSlices;
+import org.apache.druid.msq.input.stage.ReadablePartition;
+import org.apache.druid.msq.input.stage.StageInputSlice;
+import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec;
+import org.apache.druid.msq.kernel.ShuffleSpec;
 import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.WorkOrder;
 import org.apache.druid.msq.querykit.BroadcastJoinSegmentMapFnProcessor;
 import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
-import org.apache.druid.query.lookup.LookupExtractor;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
-import 
org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
+import org.apache.druid.segment.incremental.IncrementalIndex;
 
+import javax.annotation.Nullable;
+import java.util.List;
 import java.util.Objects;
 
 /**
- * Class for determining how much JVM heap to allocate to various purposes.
+ * Class for determining how much JVM heap to allocate to various purposes for 
executing a {@link WorkOrder}.
  *
- * First, we take a chunk out of the total JVM heap that is dedicated for MSQ; 
see {@link #computeUsableMemoryInJvm}.
+ * First, we split each worker's memory allotment, given by {@link 
MemoryIntrospector#memoryPerTask()}, into
+ * equally-sized "bundles" for each {@link WorkOrder} that may be running 
simultaneously within the {@link Worker}
+ * for that {@link WorkOrder}.
  *
- * Then, we carve out some space for each worker that may be running in our 
JVM; see {@link #memoryPerWorker}.
+ * Within each bundle, we carve out memory required for buffering broadcast 
data
+ * (see {@link #computeBroadcastBufferMemory}) and for concurrently-running 
processors
+ * (see {@link #computeProcessorMemory}).
  *
- * Then, we split the rest into "bundles" of equal size; see {@link 
#memoryPerBundle}. The number of bundles is based
- * entirely on server configuration; this makes the calculation robust to 
different queries running simultaneously in
- * the same JVM.
- *
- * Within each bundle, we split up memory in two different ways: one assuming 
it'll be used for a
- * {@link org.apache.druid.frame.processor.SuperSorter}, and one assuming 
it'll be used for a regular
- * processor. Callers can then use whichever set of allocations makes sense. 
(We assume no single bundle
- * will be used for both purposes.)
+ * The remainder is called "bundle free memory", a pool of memory that can be 
used for {@link SuperSorter} or
+ * {@link SegmentGeneratorFrameProcessorFactory}. The amounts overlap, because 
the same {@link WorkOrder} never
+ * does both.
  */
 public class WorkerMemoryParameters
 {
-  private static final Logger log = new Logger(WorkerMemoryParameters.class);
-
-  /**
-   * Percent of memory that we allocate to bundles. It is less than 100% 
because we need to leave some space
-   * left over for miscellaneous other stuff, and to ensure that GC pressure 
does not get too high.
-   */
-  static final double USABLE_MEMORY_FRACTION = 0.75;
-
-  /**
-   * Percent of each bundle's memory that we allocate to appenderators. It is 
less than 100% because appenderators
-   * unfortunately have a variety of unaccounted-for memory usage.
-   */
-  static final double APPENDERATOR_MEMORY_FRACTION = 0.67;
-
-  /**
-   * Size for "standard frames", which are used for most purposes, except 
inputs to super-sorters.
-   *
-   * In particular, frames that travel between workers are always the minimum 
size. This is helpful because it makes
-   * it easier to compute the amount of memory needed to merge input streams.
-   */
-  private static final int STANDARD_FRAME_SIZE = 1_000_000;
-
-  /**
-   * Size for "large frames", which are used for inputs and inner channels in 
to super-sorters.
-   *
-   * This is helpful because it minimizes the number of temporary files needed 
during super-sorting.
-   */
-  private static final int LARGE_FRAME_SIZE = 8_000_000;
-
   /**
-   * Minimum amount of bundle memory available for processing (i.e., total 
bundle size minus the amount
-   * needed for input channels). This memory is guaranteed to be available for 
things like segment generation
-   * and broadcast data.
+   * Default size for frames.
    */
-  public static final long PROCESSING_MINIMUM_BYTES = 25_000_000;
+  public static final int DEFAULT_FRAME_SIZE = 1_000_000;
 
   /**
-   * Maximum amount of parallelism for the super-sorter. Higher amounts of 
concurrency tend to be wasteful.
+   * Amount of extra memory available for each processing thread, beyond what 
is needed for input and output
+   * channels. This memory is used for miscellaneous purposes within the 
various {@link FrameProcessor}.
    */
-  private static final int MAX_SUPER_SORTER_PROCESSORS = 4;
+  private static final long EXTRA_MEMORY_PER_PROCESSOR = 25_000_000;
 
   /**
-   * Each super-sorter must have at least 1 processor with 2 input frames and 
1 output frame. That's 3 total.
+   * Percent of each bundle's free memory that we allocate to appenderators. 
It is less than 100% because appenderators
+   * unfortunately have a variety of unaccounted-for memory usage.
    */
-  private static final int MIN_SUPER_SORTER_FRAMES = 3;
+  private static final double APPENDERATOR_BUNDLE_FREE_MEMORY_FRACTION = 0.67;
 
   /**
    * (Very) rough estimate of the on-heap overhead of reading a column.
    */
   private static final int APPENDERATOR_MERGE_ROUGH_MEMORY_PER_COLUMN = 3_000;
 
   /**
-   * Maximum percent of *total* available memory (not each bundle), i.e. 
{@link #USABLE_MEMORY_FRACTION}, that we'll
-   * ever use for maxRetainedBytes of {@link ClusterByStatisticsCollectorImpl} 
across all workers.
+   * Maximum percent of each bundle's free memory that will be used for 
maxRetainedBytes of
+   * {@link ClusterByStatisticsCollectorImpl}.
    */
-  private static final double PARTITION_STATS_MEMORY_MAX_FRACTION = 0.1;
+  private static final double PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION 
= 0.1;
 
   /**
-   * Maximum number of bytes we'll ever use for maxRetainedBytes of {@link 
ClusterByStatisticsCollectorImpl} for
-   * a single worker. Acts as a limit on the value computed based on {@link 
#PARTITION_STATS_MEMORY_MAX_FRACTION}.
+   * Maximum number of bytes from each bundle's free memory that we'll ever 
use for maxRetainedBytes of
+   * {@link ClusterByStatisticsCollectorImpl}. Limits the value computed based 
on
+   * {@link #PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION}.
    */
-  private static final long PARTITION_STATS_MEMORY_MAX_BYTES = 300_000_000;
+  private static final long PARTITION_STATS_MAX_MEMORY_PER_BUNDLE = 
300_000_000;
 
   /**
-   * Threshold in bytes below which we assume that the worker is "small". 
While calculating the memory requirements for
-   * a small worker, we try to be as conservatives with the estimates and the 
extra temporary space required by the
-   * frames, since that can add up quickly and cause OOM.
+   * Minimum number of bytes from each bundle's free memory that we'll use for 
maxRetainedBytes of
+   * {@link ClusterByStatisticsCollectorImpl}.
    */
-  private static final long SMALL_WORKER_CAPACITY_THRESHOLD_BYTES = 
256_000_000;
+  private static final long PARTITION_STATS_MIN_MEMORY_PER_BUNDLE = 10_000_000;
 
   /**
-   * Fraction of free memory per bundle that can be used by {@link 
BroadcastJoinSegmentMapFnProcessor} to store broadcast
-   * data on-heap. This is used to limit the total size of input frames, which 
we expect to expand on-heap. Expansion
-   * can potentially be somewhat over 2x: for example, strings are UTF-8 in 
frames, but are UTF-16 on-heap, which is
-   * a 2x expansion, and object and index overhead must be considered on top 
of that. So we use a value somewhat
-   * lower than 0.5.
+   * Fraction of each bundle's total memory that can be used to buffer 
broadcast inputs. This is used by
+   * {@link BroadcastJoinSegmentMapFnProcessor} to limit how much joinable 
data is stored on-heap. This is carved
+   * directly out of the total bundle memory, which makes its size more 
predictable and stable: it only depends on
+   * the total JVM memory, the number of tasks per JVM, and the value of 
maxConcurrentStages for the query. This
+   * stability is important, because if the broadcast buffer fills up, the 
query fails. So any time its size changes,
+   * we risk queries failing that would formerly have succeeded.
    */
-  static final double BROADCAST_JOIN_MEMORY_FRACTION = 0.3;
+  private static final double BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION = 0.2;
 
   /**
-   * Fraction of free memory per bundle that can be used by
-   * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor} 
to buffer frames in its trackers.
+   * Multiplier to apply to {@link #BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION} 
when determining how much free bundle
+   * memory is left over. This fudge factor exists because {@link 
BroadcastJoinSegmentMapFnProcessor} applies data
+   * size limits based on frame size, which we expect to expand somewhat in 
memory due to indexing structures in
+   * {@link org.apache.druid.segment.join.table.FrameBasedIndexedTable}.
    */
-  static final double SORT_MERGE_JOIN_MEMORY_FRACTION = 0.9;
+  private static final double BROADCAST_BUFFER_OVERHEAD_RATIO = 1.5;
 
   /**
-   * In case {@link NotEnoughMemoryFault} is thrown, a fixed estimation 
overhead is added when estimating total memory required for the process.
+   * Amount of memory that can be used by
+   * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor} 
to buffer frames in its trackers.
    */
-  private static final long BUFFER_BYTES_FOR_ESTIMATION = 1000;
+  private static final long SORT_MERGE_JOIN_MEMORY_PER_PROCESSOR = (long) 
(EXTRA_MEMORY_PER_PROCESSOR * 0.9);
 
-  private final long processorBundleMemory;
-  private final int superSorterMaxActiveProcessors;
-  private final int superSorterMaxChannelsPerProcessor;
+  private final long bundleFreeMemory;
+  private final int frameSize;
+  private final int superSorterConcurrentProcessors;
+  private final int superSorterMaxChannelsPerMerger;
   private final int partitionStatisticsMaxRetainedBytes;
-
-  WorkerMemoryParameters(
-      final long processorBundleMemory,
-      final int superSorterMaxActiveProcessors,
-      final int superSorterMaxChannelsPerProcessor,
-      final int partitionStatisticsMaxRetainedBytes
+  private final long broadcastBufferMemory;
+
+  public WorkerMemoryParameters(
+      final long bundleFreeMemory,
+      final int frameSize,
+      final int superSorterConcurrentProcessors,
+      final int superSorterMaxChannelsPerMerger,
+      final int partitionStatisticsMaxRetainedBytes,
+      final long broadcastBufferMemory
   )
   {
-    this.processorBundleMemory = processorBundleMemory;
-    this.superSorterMaxActiveProcessors = superSorterMaxActiveProcessors;
-    this.superSorterMaxChannelsPerProcessor = 
superSorterMaxChannelsPerProcessor;
+    this.bundleFreeMemory = bundleFreeMemory;
+    this.frameSize = frameSize;
+    this.superSorterConcurrentProcessors = superSorterConcurrentProcessors;
+    this.superSorterMaxChannelsPerMerger = superSorterMaxChannelsPerMerger;
     this.partitionStatisticsMaxRetainedBytes = 
partitionStatisticsMaxRetainedBytes;
+    this.broadcastBufferMemory = broadcastBufferMemory;
   }
 
   /**
-   * Create a production instance for {@link 
org.apache.druid.msq.indexing.MSQWorkerTask}.
+   * Create a production instance for a given {@link WorkOrder}.
    */
-  public static WorkerMemoryParameters createProductionInstanceForWorker(
-      final Injector injector,
-      final QueryDefinition queryDef,
-      final int stageNumber,
+  public static WorkerMemoryParameters createProductionInstance(
+      final WorkOrder workOrder,
+      final MemoryIntrospector memoryIntrospector,
       final int maxConcurrentStages
   )
   {
-    final StageDefinition stageDef = queryDef.getStageDefinition(stageNumber);
-    final IntSet inputStageNumbers = 
InputSpecs.getStageNumbers(stageDef.getInputSpecs());
-    final int numInputWorkers =
-        inputStageNumbers.intStream()
-                         .map(inputStageNumber -> 
queryDef.getStageDefinition(inputStageNumber).getMaxWorkerCount())
-                         .sum();
-    long totalLookupFootprint = computeTotalLookupFootprint(injector);
-
-    final int numHashOutputPartitions;
-    if (stageDef.doesShuffle() && stageDef.getShuffleSpec().kind().isHash()) {
-      numHashOutputPartitions = stageDef.getShuffleSpec().partitionCount();
-    } else {
-      numHashOutputPartitions = 0;
-    }
-
+    final StageDefinition stageDef = workOrder.getStageDefinition();
     return createInstance(
-        Runtime.getRuntime().maxMemory(),
-        computeNumWorkersInJvm(injector),
-        computeNumProcessorsInJvm(injector),
+        memoryIntrospector,
+        DEFAULT_FRAME_SIZE,
+        workOrder.getInputs(),
+        stageDef.getBroadcastInputNumbers(),
+        stageDef.doesShuffle() ? stageDef.getShuffleSpec() : null,
         maxConcurrentStages,
-        numInputWorkers,
-        numHashOutputPartitions,
-        totalLookupFootprint
+        computeFramesPerOutputChannel(workOrder.getOutputChannelMode())
     );
   }
 
   /**
-   * Returns an object specifying memory-usage parameters.
+   * Returns an object specifying memory-usage parameters for a {@link 
WorkOrder} running inside a {@link Worker}.
    *
    * Throws a {@link MSQException} with an appropriate fault if the provided 
combination of parameters cannot
    * yield a workable memory situation.
    *
-   * @param maxMemoryInJvm            memory available in the entire JVM. This 
will be divided amongst processors.
-   * @param numWorkersInJvm           number of workers that can run 
concurrently in this JVM. Generally equal to
-   *                                  the task capacity.
-   * @param numProcessingThreadsInJvm size of the processing thread pool in 
the JVM.
-   * @param maxConcurrentStages       maximum number of concurrent stages per 
worker.
-   * @param numInputWorkers           total number of workers across all input 
stages.
-   * @param numHashOutputPartitions   total number of output partitions, if 
using hash partitioning; zero if not using
-   *                                  hash partitioning.
-   * @param totalLookupFootprint      estimated size of the lookups loaded by 
the process.
+   * @param memoryIntrospector           memory introspector
+   * @param frameSize                    frame size
+   * @param hasBroadcastInputs           whether this stage has broadcast 
inputs. If so, we allocate some memory
+   *                                     for {@link 
#getBroadcastBufferMemory()}.
+   * @param needsStatistics              whether this stage needs to collect 
key statistics, i.e.,

Review Comment:
   ## Spurious Javadoc @param tags
   
   @param tag "needsStatistics" does not match any actual parameter of method 
"createInstance()".
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/7808)



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java:
##########
@@ -19,349 +19,285 @@
 
 package org.apache.druid.msq.exec;
 
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.primitives.Ints;
-import com.google.inject.Injector;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
-import org.apache.druid.frame.processor.Bouncer;
-import org.apache.druid.indexing.worker.config.WorkerConfig;
-import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.SuperSorter;
 import org.apache.druid.msq.indexing.error.MSQException;
 import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
 import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
-import org.apache.druid.msq.input.InputSpecs;
-import org.apache.druid.msq.kernel.QueryDefinition;
+import 
org.apache.druid.msq.indexing.processor.KeyStatisticsCollectionProcessor;
+import 
org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSlices;
+import org.apache.druid.msq.input.stage.ReadablePartition;
+import org.apache.druid.msq.input.stage.StageInputSlice;
+import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec;
+import org.apache.druid.msq.kernel.ShuffleSpec;
 import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.WorkOrder;
 import org.apache.druid.msq.querykit.BroadcastJoinSegmentMapFnProcessor;
 import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
-import org.apache.druid.query.lookup.LookupExtractor;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
-import 
org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
+import org.apache.druid.segment.incremental.IncrementalIndex;
 
+import javax.annotation.Nullable;
+import java.util.List;
 import java.util.Objects;
 
 /**
- * Class for determining how much JVM heap to allocate to various purposes.
+ * Class for determining how much JVM heap to allocate to various purposes for 
executing a {@link WorkOrder}.
  *
- * First, we take a chunk out of the total JVM heap that is dedicated for MSQ; 
see {@link #computeUsableMemoryInJvm}.
+ * First, we split each worker's memory allotment, given by {@link 
MemoryIntrospector#memoryPerTask()}, into
+ * equally-sized "bundles" for each {@link WorkOrder} that may be running 
simultaneously within the {@link Worker}
+ * for that {@link WorkOrder}.
  *
- * Then, we carve out some space for each worker that may be running in our 
JVM; see {@link #memoryPerWorker}.
+ * Within each bundle, we carve out memory required for buffering broadcast 
data
+ * (see {@link #computeBroadcastBufferMemory}) and for concurrently-running 
processors
+ * (see {@link #computeProcessorMemory}).
  *
- * Then, we split the rest into "bundles" of equal size; see {@link 
#memoryPerBundle}. The number of bundles is based
- * entirely on server configuration; this makes the calculation robust to 
different queries running simultaneously in
- * the same JVM.
- *
- * Within each bundle, we split up memory in two different ways: one assuming 
it'll be used for a
- * {@link org.apache.druid.frame.processor.SuperSorter}, and one assuming 
it'll be used for a regular
- * processor. Callers can then use whichever set of allocations makes sense. 
(We assume no single bundle
- * will be used for both purposes.)
+ * The remainder is called "bundle free memory", a pool of memory that can be 
used for {@link SuperSorter} or
+ * {@link SegmentGeneratorFrameProcessorFactory}. The amounts overlap, because 
the same {@link WorkOrder} never
+ * does both.
  */
 public class WorkerMemoryParameters
 {
-  private static final Logger log = new Logger(WorkerMemoryParameters.class);
-
-  /**
-   * Percent of memory that we allocate to bundles. It is less than 100% 
because we need to leave some space
-   * left over for miscellaneous other stuff, and to ensure that GC pressure 
does not get too high.
-   */
-  static final double USABLE_MEMORY_FRACTION = 0.75;
-
-  /**
-   * Percent of each bundle's memory that we allocate to appenderators. It is 
less than 100% because appenderators
-   * unfortunately have a variety of unaccounted-for memory usage.
-   */
-  static final double APPENDERATOR_MEMORY_FRACTION = 0.67;
-
-  /**
-   * Size for "standard frames", which are used for most purposes, except 
inputs to super-sorters.
-   *
-   * In particular, frames that travel between workers are always the minimum 
size. This is helpful because it makes
-   * it easier to compute the amount of memory needed to merge input streams.
-   */
-  private static final int STANDARD_FRAME_SIZE = 1_000_000;
-
-  /**
-   * Size for "large frames", which are used for inputs and inner channels in 
to super-sorters.
-   *
-   * This is helpful because it minimizes the number of temporary files needed 
during super-sorting.
-   */
-  private static final int LARGE_FRAME_SIZE = 8_000_000;
-
   /**
-   * Minimum amount of bundle memory available for processing (i.e., total 
bundle size minus the amount
-   * needed for input channels). This memory is guaranteed to be available for 
things like segment generation
-   * and broadcast data.
+   * Default size for frames.
    */
-  public static final long PROCESSING_MINIMUM_BYTES = 25_000_000;
+  public static final int DEFAULT_FRAME_SIZE = 1_000_000;
 
   /**
-   * Maximum amount of parallelism for the super-sorter. Higher amounts of 
concurrency tend to be wasteful.
+   * Amount of extra memory available for each processing thread, beyond what 
is needed for input and output
+   * channels. This memory is used for miscellaneous purposes within the 
various {@link FrameProcessor}.
    */
-  private static final int MAX_SUPER_SORTER_PROCESSORS = 4;
+  private static final long EXTRA_MEMORY_PER_PROCESSOR = 25_000_000;
 
   /**
-   * Each super-sorter must have at least 1 processor with 2 input frames and 
1 output frame. That's 3 total.
+   * Percent of each bundle's free memory that we allocate to appenderators. 
It is less than 100% because appenderators
+   * unfortunately have a variety of unaccounted-for memory usage.
    */
-  private static final int MIN_SUPER_SORTER_FRAMES = 3;
+  private static final double APPENDERATOR_BUNDLE_FREE_MEMORY_FRACTION = 0.67;
 
   /**
    * (Very) rough estimate of the on-heap overhead of reading a column.
    */
   private static final int APPENDERATOR_MERGE_ROUGH_MEMORY_PER_COLUMN = 3_000;
 
   /**
-   * Maximum percent of *total* available memory (not each bundle), i.e. 
{@link #USABLE_MEMORY_FRACTION}, that we'll
-   * ever use for maxRetainedBytes of {@link ClusterByStatisticsCollectorImpl} 
across all workers.
+   * Maximum percent of each bundle's free memory that will be used for 
maxRetainedBytes of
+   * {@link ClusterByStatisticsCollectorImpl}.
    */
-  private static final double PARTITION_STATS_MEMORY_MAX_FRACTION = 0.1;
+  private static final double PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION 
= 0.1;
 
   /**
-   * Maximum number of bytes we'll ever use for maxRetainedBytes of {@link 
ClusterByStatisticsCollectorImpl} for
-   * a single worker. Acts as a limit on the value computed based on {@link 
#PARTITION_STATS_MEMORY_MAX_FRACTION}.
+   * Maximum number of bytes from each bundle's free memory that we'll ever 
use for maxRetainedBytes of
+   * {@link ClusterByStatisticsCollectorImpl}. Limits the value computed based 
on
+   * {@link #PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION}.
    */
-  private static final long PARTITION_STATS_MEMORY_MAX_BYTES = 300_000_000;
+  private static final long PARTITION_STATS_MAX_MEMORY_PER_BUNDLE = 
300_000_000;
 
   /**
-   * Threshold in bytes below which we assume that the worker is "small". 
While calculating the memory requirements for
-   * a small worker, we try to be as conservatives with the estimates and the 
extra temporary space required by the
-   * frames, since that can add up quickly and cause OOM.
+   * Minimum number of bytes from each bundle's free memory that we'll use for 
maxRetainedBytes of
+   * {@link ClusterByStatisticsCollectorImpl}.
    */
-  private static final long SMALL_WORKER_CAPACITY_THRESHOLD_BYTES = 
256_000_000;
+  private static final long PARTITION_STATS_MIN_MEMORY_PER_BUNDLE = 10_000_000;
 
   /**
-   * Fraction of free memory per bundle that can be used by {@link 
BroadcastJoinSegmentMapFnProcessor} to store broadcast
-   * data on-heap. This is used to limit the total size of input frames, which 
we expect to expand on-heap. Expansion
-   * can potentially be somewhat over 2x: for example, strings are UTF-8 in 
frames, but are UTF-16 on-heap, which is
-   * a 2x expansion, and object and index overhead must be considered on top 
of that. So we use a value somewhat
-   * lower than 0.5.
+   * Fraction of each bundle's total memory that can be used to buffer 
broadcast inputs. This is used by
+   * {@link BroadcastJoinSegmentMapFnProcessor} to limit how much joinable 
data is stored on-heap. This is carved
+   * directly out of the total bundle memory, which makes its size more 
predictable and stable: it only depends on
+   * the total JVM memory, the number of tasks per JVM, and the value of 
maxConcurrentStages for the query. This
+   * stability is important, because if the broadcast buffer fills up, the 
query fails. So any time its size changes,
+   * we risk queries failing that would formerly have succeeded.
    */
-  static final double BROADCAST_JOIN_MEMORY_FRACTION = 0.3;
+  private static final double BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION = 0.2;
 
   /**
-   * Fraction of free memory per bundle that can be used by
-   * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor} 
to buffer frames in its trackers.
+   * Multiplier to apply to {@link #BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION} 
when determining how much free bundle
+   * memory is left over. This fudge factor exists because {@link 
BroadcastJoinSegmentMapFnProcessor} applies data
+   * size limits based on frame size, which we expect to expand somewhat in 
memory due to indexing structures in
+   * {@link org.apache.druid.segment.join.table.FrameBasedIndexedTable}.
    */
-  static final double SORT_MERGE_JOIN_MEMORY_FRACTION = 0.9;
+  private static final double BROADCAST_BUFFER_OVERHEAD_RATIO = 1.5;
 
   /**
-   * In case {@link NotEnoughMemoryFault} is thrown, a fixed estimation 
overhead is added when estimating total memory required for the process.
+   * Amount of memory that can be used by
+   * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor} 
to buffer frames in its trackers.
    */
-  private static final long BUFFER_BYTES_FOR_ESTIMATION = 1000;
+  private static final long SORT_MERGE_JOIN_MEMORY_PER_PROCESSOR = (long) 
(EXTRA_MEMORY_PER_PROCESSOR * 0.9);
 
-  private final long processorBundleMemory;
-  private final int superSorterMaxActiveProcessors;
-  private final int superSorterMaxChannelsPerProcessor;
+  private final long bundleFreeMemory;
+  private final int frameSize;
+  private final int superSorterConcurrentProcessors;
+  private final int superSorterMaxChannelsPerMerger;
   private final int partitionStatisticsMaxRetainedBytes;
-
-  WorkerMemoryParameters(
-      final long processorBundleMemory,
-      final int superSorterMaxActiveProcessors,
-      final int superSorterMaxChannelsPerProcessor,
-      final int partitionStatisticsMaxRetainedBytes
+  private final long broadcastBufferMemory;
+
+  public WorkerMemoryParameters(
+      final long bundleFreeMemory,
+      final int frameSize,
+      final int superSorterConcurrentProcessors,
+      final int superSorterMaxChannelsPerMerger,
+      final int partitionStatisticsMaxRetainedBytes,
+      final long broadcastBufferMemory
   )
   {
-    this.processorBundleMemory = processorBundleMemory;
-    this.superSorterMaxActiveProcessors = superSorterMaxActiveProcessors;
-    this.superSorterMaxChannelsPerProcessor = 
superSorterMaxChannelsPerProcessor;
+    this.bundleFreeMemory = bundleFreeMemory;
+    this.frameSize = frameSize;
+    this.superSorterConcurrentProcessors = superSorterConcurrentProcessors;
+    this.superSorterMaxChannelsPerMerger = superSorterMaxChannelsPerMerger;
     this.partitionStatisticsMaxRetainedBytes = 
partitionStatisticsMaxRetainedBytes;
+    this.broadcastBufferMemory = broadcastBufferMemory;
   }
 
   /**
-   * Create a production instance for {@link 
org.apache.druid.msq.indexing.MSQWorkerTask}.
+   * Create a production instance for a given {@link WorkOrder}.
    */
-  public static WorkerMemoryParameters createProductionInstanceForWorker(
-      final Injector injector,
-      final QueryDefinition queryDef,
-      final int stageNumber,
+  public static WorkerMemoryParameters createProductionInstance(
+      final WorkOrder workOrder,
+      final MemoryIntrospector memoryIntrospector,
       final int maxConcurrentStages
   )
   {
-    final StageDefinition stageDef = queryDef.getStageDefinition(stageNumber);
-    final IntSet inputStageNumbers = 
InputSpecs.getStageNumbers(stageDef.getInputSpecs());
-    final int numInputWorkers =
-        inputStageNumbers.intStream()
-                         .map(inputStageNumber -> 
queryDef.getStageDefinition(inputStageNumber).getMaxWorkerCount())
-                         .sum();
-    long totalLookupFootprint = computeTotalLookupFootprint(injector);
-
-    final int numHashOutputPartitions;
-    if (stageDef.doesShuffle() && stageDef.getShuffleSpec().kind().isHash()) {
-      numHashOutputPartitions = stageDef.getShuffleSpec().partitionCount();
-    } else {
-      numHashOutputPartitions = 0;
-    }
-
+    final StageDefinition stageDef = workOrder.getStageDefinition();
     return createInstance(
-        Runtime.getRuntime().maxMemory(),
-        computeNumWorkersInJvm(injector),
-        computeNumProcessorsInJvm(injector),
+        memoryIntrospector,
+        DEFAULT_FRAME_SIZE,
+        workOrder.getInputs(),
+        stageDef.getBroadcastInputNumbers(),
+        stageDef.doesShuffle() ? stageDef.getShuffleSpec() : null,
         maxConcurrentStages,
-        numInputWorkers,
-        numHashOutputPartitions,
-        totalLookupFootprint
+        computeFramesPerOutputChannel(workOrder.getOutputChannelMode())
     );
   }
 
   /**
-   * Returns an object specifying memory-usage parameters.
+   * Returns an object specifying memory-usage parameters for a {@link 
WorkOrder} running inside a {@link Worker}.
    *
    * Throws a {@link MSQException} with an appropriate fault if the provided 
combination of parameters cannot
    * yield a workable memory situation.
    *
-   * @param maxMemoryInJvm            memory available in the entire JVM. This 
will be divided amongst processors.
-   * @param numWorkersInJvm           number of workers that can run 
concurrently in this JVM. Generally equal to
-   *                                  the task capacity.
-   * @param numProcessingThreadsInJvm size of the processing thread pool in 
the JVM.
-   * @param maxConcurrentStages       maximum number of concurrent stages per 
worker.
-   * @param numInputWorkers           total number of workers across all input 
stages.
-   * @param numHashOutputPartitions   total number of output partitions, if 
using hash partitioning; zero if not using
-   *                                  hash partitioning.
-   * @param totalLookupFootprint      estimated size of the lookups loaded by 
the process.
+   * @param memoryIntrospector           memory introspector
+   * @param frameSize                    frame size
+   * @param hasBroadcastInputs           whether this stage has broadcast 
inputs. If so, we allocate some memory

Review Comment:
   ## Spurious Javadoc @param tags
   
   @param tag "hasBroadcastInputs" does not match any actual parameter of 
method "createInstance()".
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/7807)



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java:
##########
@@ -19,349 +19,285 @@
 
 package org.apache.druid.msq.exec;
 
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.primitives.Ints;
-import com.google.inject.Injector;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
-import org.apache.druid.frame.processor.Bouncer;
-import org.apache.druid.indexing.worker.config.WorkerConfig;
-import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.SuperSorter;
 import org.apache.druid.msq.indexing.error.MSQException;
 import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
 import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
-import org.apache.druid.msq.input.InputSpecs;
-import org.apache.druid.msq.kernel.QueryDefinition;
+import 
org.apache.druid.msq.indexing.processor.KeyStatisticsCollectionProcessor;
+import 
org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSlices;
+import org.apache.druid.msq.input.stage.ReadablePartition;
+import org.apache.druid.msq.input.stage.StageInputSlice;
+import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec;
+import org.apache.druid.msq.kernel.ShuffleSpec;
 import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.WorkOrder;
 import org.apache.druid.msq.querykit.BroadcastJoinSegmentMapFnProcessor;
 import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
-import org.apache.druid.query.lookup.LookupExtractor;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
-import 
org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
+import org.apache.druid.segment.incremental.IncrementalIndex;
 
+import javax.annotation.Nullable;
+import java.util.List;
 import java.util.Objects;
 
 /**
- * Class for determining how much JVM heap to allocate to various purposes.
+ * Class for determining how much JVM heap to allocate to various purposes for 
executing a {@link WorkOrder}.
  *
- * First, we take a chunk out of the total JVM heap that is dedicated for MSQ; 
see {@link #computeUsableMemoryInJvm}.
+ * First, we split each worker's memory allotment, given by {@link 
MemoryIntrospector#memoryPerTask()}, into
+ * equally-sized "bundles" for each {@link WorkOrder} that may be running 
simultaneously within the {@link Worker}
+ * for that {@link WorkOrder}.
  *
- * Then, we carve out some space for each worker that may be running in our 
JVM; see {@link #memoryPerWorker}.
+ * Within each bundle, we carve out memory required for buffering broadcast 
data
+ * (see {@link #computeBroadcastBufferMemory}) and for concurrently-running 
processors
+ * (see {@link #computeProcessorMemory}).
  *
- * Then, we split the rest into "bundles" of equal size; see {@link 
#memoryPerBundle}. The number of bundles is based
- * entirely on server configuration; this makes the calculation robust to 
different queries running simultaneously in
- * the same JVM.
- *
- * Within each bundle, we split up memory in two different ways: one assuming 
it'll be used for a
- * {@link org.apache.druid.frame.processor.SuperSorter}, and one assuming 
it'll be used for a regular
- * processor. Callers can then use whichever set of allocations makes sense. 
(We assume no single bundle
- * will be used for both purposes.)
+ * The remainder is called "bundle free memory", a pool of memory that can be 
used for {@link SuperSorter} or
+ * {@link SegmentGeneratorFrameProcessorFactory}. The amounts overlap, because 
the same {@link WorkOrder} never
+ * does both.
  */
 public class WorkerMemoryParameters
 {
-  private static final Logger log = new Logger(WorkerMemoryParameters.class);
-
-  /**
-   * Percent of memory that we allocate to bundles. It is less than 100% 
because we need to leave some space
-   * left over for miscellaneous other stuff, and to ensure that GC pressure 
does not get too high.
-   */
-  static final double USABLE_MEMORY_FRACTION = 0.75;
-
-  /**
-   * Percent of each bundle's memory that we allocate to appenderators. It is 
less than 100% because appenderators
-   * unfortunately have a variety of unaccounted-for memory usage.
-   */
-  static final double APPENDERATOR_MEMORY_FRACTION = 0.67;
-
-  /**
-   * Size for "standard frames", which are used for most purposes, except 
inputs to super-sorters.
-   *
-   * In particular, frames that travel between workers are always the minimum 
size. This is helpful because it makes
-   * it easier to compute the amount of memory needed to merge input streams.
-   */
-  private static final int STANDARD_FRAME_SIZE = 1_000_000;
-
-  /**
-   * Size for "large frames", which are used for inputs and inner channels in 
to super-sorters.
-   *
-   * This is helpful because it minimizes the number of temporary files needed 
during super-sorting.
-   */
-  private static final int LARGE_FRAME_SIZE = 8_000_000;
-
   /**
-   * Minimum amount of bundle memory available for processing (i.e., total 
bundle size minus the amount
-   * needed for input channels). This memory is guaranteed to be available for 
things like segment generation
-   * and broadcast data.
+   * Default size for frames.
    */
-  public static final long PROCESSING_MINIMUM_BYTES = 25_000_000;
+  public static final int DEFAULT_FRAME_SIZE = 1_000_000;
 
   /**
-   * Maximum amount of parallelism for the super-sorter. Higher amounts of 
concurrency tend to be wasteful.
+   * Amount of extra memory available for each processing thread, beyond what 
is needed for input and output
+   * channels. This memory is used for miscellaneous purposes within the 
various {@link FrameProcessor}.
    */
-  private static final int MAX_SUPER_SORTER_PROCESSORS = 4;
+  private static final long EXTRA_MEMORY_PER_PROCESSOR = 25_000_000;
 
   /**
-   * Each super-sorter must have at least 1 processor with 2 input frames and 
1 output frame. That's 3 total.
+   * Percent of each bundle's free memory that we allocate to appenderators. 
It is less than 100% because appenderators
+   * unfortunately have a variety of unaccounted-for memory usage.
    */
-  private static final int MIN_SUPER_SORTER_FRAMES = 3;
+  private static final double APPENDERATOR_BUNDLE_FREE_MEMORY_FRACTION = 0.67;
 
   /**
    * (Very) rough estimate of the on-heap overhead of reading a column.
    */
   private static final int APPENDERATOR_MERGE_ROUGH_MEMORY_PER_COLUMN = 3_000;
 
   /**
-   * Maximum percent of *total* available memory (not each bundle), i.e. 
{@link #USABLE_MEMORY_FRACTION}, that we'll
-   * ever use for maxRetainedBytes of {@link ClusterByStatisticsCollectorImpl} 
across all workers.
+   * Maximum percent of each bundle's free memory that will be used for 
maxRetainedBytes of
+   * {@link ClusterByStatisticsCollectorImpl}.
    */
-  private static final double PARTITION_STATS_MEMORY_MAX_FRACTION = 0.1;
+  private static final double PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION 
= 0.1;
 
   /**
-   * Maximum number of bytes we'll ever use for maxRetainedBytes of {@link 
ClusterByStatisticsCollectorImpl} for
-   * a single worker. Acts as a limit on the value computed based on {@link 
#PARTITION_STATS_MEMORY_MAX_FRACTION}.
+   * Maximum number of bytes from each bundle's free memory that we'll ever 
use for maxRetainedBytes of
+   * {@link ClusterByStatisticsCollectorImpl}. Limits the value computed based 
on
+   * {@link #PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION}.
    */
-  private static final long PARTITION_STATS_MEMORY_MAX_BYTES = 300_000_000;
+  private static final long PARTITION_STATS_MAX_MEMORY_PER_BUNDLE = 
300_000_000;
 
   /**
-   * Threshold in bytes below which we assume that the worker is "small". 
While calculating the memory requirements for
-   * a small worker, we try to be as conservatives with the estimates and the 
extra temporary space required by the
-   * frames, since that can add up quickly and cause OOM.
+   * Minimum number of bytes from each bundle's free memory that we'll use for 
maxRetainedBytes of
+   * {@link ClusterByStatisticsCollectorImpl}.
    */
-  private static final long SMALL_WORKER_CAPACITY_THRESHOLD_BYTES = 
256_000_000;
+  private static final long PARTITION_STATS_MIN_MEMORY_PER_BUNDLE = 10_000_000;
 
   /**
-   * Fraction of free memory per bundle that can be used by {@link 
BroadcastJoinSegmentMapFnProcessor} to store broadcast
-   * data on-heap. This is used to limit the total size of input frames, which 
we expect to expand on-heap. Expansion
-   * can potentially be somewhat over 2x: for example, strings are UTF-8 in 
frames, but are UTF-16 on-heap, which is
-   * a 2x expansion, and object and index overhead must be considered on top 
of that. So we use a value somewhat
-   * lower than 0.5.
+   * Fraction of each bundle's total memory that can be used to buffer 
broadcast inputs. This is used by
+   * {@link BroadcastJoinSegmentMapFnProcessor} to limit how much joinable 
data is stored on-heap. This is carved
+   * directly out of the total bundle memory, which makes its size more 
predictable and stable: it only depends on
+   * the total JVM memory, the number of tasks per JVM, and the value of 
maxConcurrentStages for the query. This
+   * stability is important, because if the broadcast buffer fills up, the 
query fails. So any time its size changes,
+   * we risk queries failing that would formerly have succeeded.
    */
-  static final double BROADCAST_JOIN_MEMORY_FRACTION = 0.3;
+  private static final double BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION = 0.2;
 
   /**
-   * Fraction of free memory per bundle that can be used by
-   * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor} 
to buffer frames in its trackers.
+   * Multiplier to apply to {@link #BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION} 
when determining how much free bundle
+   * memory is left over. This fudge factor exists because {@link 
BroadcastJoinSegmentMapFnProcessor} applies data
+   * size limits based on frame size, which we expect to expand somewhat in 
memory due to indexing structures in
+   * {@link org.apache.druid.segment.join.table.FrameBasedIndexedTable}.
    */
-  static final double SORT_MERGE_JOIN_MEMORY_FRACTION = 0.9;
+  private static final double BROADCAST_BUFFER_OVERHEAD_RATIO = 1.5;
 
   /**
-   * In case {@link NotEnoughMemoryFault} is thrown, a fixed estimation 
overhead is added when estimating total memory required for the process.
+   * Amount of memory that can be used by
+   * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor} 
to buffer frames in its trackers.
    */
-  private static final long BUFFER_BYTES_FOR_ESTIMATION = 1000;
+  private static final long SORT_MERGE_JOIN_MEMORY_PER_PROCESSOR = (long) 
(EXTRA_MEMORY_PER_PROCESSOR * 0.9);
 
-  private final long processorBundleMemory;
-  private final int superSorterMaxActiveProcessors;
-  private final int superSorterMaxChannelsPerProcessor;
+  private final long bundleFreeMemory;
+  private final int frameSize;
+  private final int superSorterConcurrentProcessors;
+  private final int superSorterMaxChannelsPerMerger;
   private final int partitionStatisticsMaxRetainedBytes;
-
-  WorkerMemoryParameters(
-      final long processorBundleMemory,
-      final int superSorterMaxActiveProcessors,
-      final int superSorterMaxChannelsPerProcessor,
-      final int partitionStatisticsMaxRetainedBytes
+  private final long broadcastBufferMemory;
+
+  public WorkerMemoryParameters(
+      final long bundleFreeMemory,
+      final int frameSize,
+      final int superSorterConcurrentProcessors,
+      final int superSorterMaxChannelsPerMerger,
+      final int partitionStatisticsMaxRetainedBytes,
+      final long broadcastBufferMemory
   )
   {
-    this.processorBundleMemory = processorBundleMemory;
-    this.superSorterMaxActiveProcessors = superSorterMaxActiveProcessors;
-    this.superSorterMaxChannelsPerProcessor = 
superSorterMaxChannelsPerProcessor;
+    this.bundleFreeMemory = bundleFreeMemory;
+    this.frameSize = frameSize;
+    this.superSorterConcurrentProcessors = superSorterConcurrentProcessors;
+    this.superSorterMaxChannelsPerMerger = superSorterMaxChannelsPerMerger;
     this.partitionStatisticsMaxRetainedBytes = 
partitionStatisticsMaxRetainedBytes;
+    this.broadcastBufferMemory = broadcastBufferMemory;
   }
 
   /**
-   * Create a production instance for {@link 
org.apache.druid.msq.indexing.MSQWorkerTask}.
+   * Create a production instance for a given {@link WorkOrder}.
    */
-  public static WorkerMemoryParameters createProductionInstanceForWorker(
-      final Injector injector,
-      final QueryDefinition queryDef,
-      final int stageNumber,
+  public static WorkerMemoryParameters createProductionInstance(
+      final WorkOrder workOrder,
+      final MemoryIntrospector memoryIntrospector,
       final int maxConcurrentStages
   )
   {
-    final StageDefinition stageDef = queryDef.getStageDefinition(stageNumber);
-    final IntSet inputStageNumbers = 
InputSpecs.getStageNumbers(stageDef.getInputSpecs());
-    final int numInputWorkers =
-        inputStageNumbers.intStream()
-                         .map(inputStageNumber -> 
queryDef.getStageDefinition(inputStageNumber).getMaxWorkerCount())
-                         .sum();
-    long totalLookupFootprint = computeTotalLookupFootprint(injector);
-
-    final int numHashOutputPartitions;
-    if (stageDef.doesShuffle() && stageDef.getShuffleSpec().kind().isHash()) {
-      numHashOutputPartitions = stageDef.getShuffleSpec().partitionCount();
-    } else {
-      numHashOutputPartitions = 0;
-    }
-
+    final StageDefinition stageDef = workOrder.getStageDefinition();
     return createInstance(
-        Runtime.getRuntime().maxMemory(),
-        computeNumWorkersInJvm(injector),
-        computeNumProcessorsInJvm(injector),
+        memoryIntrospector,
+        DEFAULT_FRAME_SIZE,
+        workOrder.getInputs(),
+        stageDef.getBroadcastInputNumbers(),
+        stageDef.doesShuffle() ? stageDef.getShuffleSpec() : null,
         maxConcurrentStages,
-        numInputWorkers,
-        numHashOutputPartitions,
-        totalLookupFootprint
+        computeFramesPerOutputChannel(workOrder.getOutputChannelMode())
     );
   }
 
   /**
-   * Returns an object specifying memory-usage parameters.
+   * Returns an object specifying memory-usage parameters for a {@link 
WorkOrder} running inside a {@link Worker}.
    *
    * Throws a {@link MSQException} with an appropriate fault if the provided 
combination of parameters cannot
    * yield a workable memory situation.
    *
-   * @param maxMemoryInJvm            memory available in the entire JVM. This 
will be divided amongst processors.
-   * @param numWorkersInJvm           number of workers that can run 
concurrently in this JVM. Generally equal to
-   *                                  the task capacity.
-   * @param numProcessingThreadsInJvm size of the processing thread pool in 
the JVM.
-   * @param maxConcurrentStages       maximum number of concurrent stages per 
worker.
-   * @param numInputWorkers           total number of workers across all input 
stages.
-   * @param numHashOutputPartitions   total number of output partitions, if 
using hash partitioning; zero if not using
-   *                                  hash partitioning.
-   * @param totalLookupFootprint      estimated size of the lookups loaded by 
the process.
+   * @param memoryIntrospector           memory introspector
+   * @param frameSize                    frame size
+   * @param hasBroadcastInputs           whether this stage has broadcast 
inputs. If so, we allocate some memory
+   *                                     for {@link 
#getBroadcastBufferMemory()}.
+   * @param needsStatistics              whether this stage needs to collect 
key statistics, i.e.,
+   *                                     {@link 
StageDefinition#mustGatherResultKeyStatistics()}
+   * @param inputWorkers                 figure from {@link 
#computeNumInputWorkers}

Review Comment:
   ## Spurious Javadoc @param tags
   
   @param tag "inputWorkers" does not match any actual parameter of method 
"createInstance()".
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/7809)



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java:
##########
@@ -19,349 +19,285 @@
 
 package org.apache.druid.msq.exec;
 
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.primitives.Ints;
-import com.google.inject.Injector;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
-import org.apache.druid.frame.processor.Bouncer;
-import org.apache.druid.indexing.worker.config.WorkerConfig;
-import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.SuperSorter;
 import org.apache.druid.msq.indexing.error.MSQException;
 import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
 import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
-import org.apache.druid.msq.input.InputSpecs;
-import org.apache.druid.msq.kernel.QueryDefinition;
+import 
org.apache.druid.msq.indexing.processor.KeyStatisticsCollectionProcessor;
+import 
org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSlices;
+import org.apache.druid.msq.input.stage.ReadablePartition;
+import org.apache.druid.msq.input.stage.StageInputSlice;
+import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec;
+import org.apache.druid.msq.kernel.ShuffleSpec;
 import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.WorkOrder;
 import org.apache.druid.msq.querykit.BroadcastJoinSegmentMapFnProcessor;
 import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
-import org.apache.druid.query.lookup.LookupExtractor;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
-import 
org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
+import org.apache.druid.segment.incremental.IncrementalIndex;
 
+import javax.annotation.Nullable;
+import java.util.List;
 import java.util.Objects;
 
 /**
- * Class for determining how much JVM heap to allocate to various purposes.
+ * Class for determining how much JVM heap to allocate to various purposes for 
executing a {@link WorkOrder}.
  *
- * First, we take a chunk out of the total JVM heap that is dedicated for MSQ; 
see {@link #computeUsableMemoryInJvm}.
+ * First, we split each worker's memory allotment, given by {@link 
MemoryIntrospector#memoryPerTask()}, into
+ * equally-sized "bundles" for each {@link WorkOrder} that may be running 
simultaneously within the {@link Worker}
+ * for that {@link WorkOrder}.
  *
- * Then, we carve out some space for each worker that may be running in our 
JVM; see {@link #memoryPerWorker}.
+ * Within each bundle, we carve out memory required for buffering broadcast 
data
+ * (see {@link #computeBroadcastBufferMemory}) and for concurrently-running 
processors
+ * (see {@link #computeProcessorMemory}).
  *
- * Then, we split the rest into "bundles" of equal size; see {@link 
#memoryPerBundle}. The number of bundles is based
- * entirely on server configuration; this makes the calculation robust to 
different queries running simultaneously in
- * the same JVM.
- *
- * Within each bundle, we split up memory in two different ways: one assuming 
it'll be used for a
- * {@link org.apache.druid.frame.processor.SuperSorter}, and one assuming 
it'll be used for a regular
- * processor. Callers can then use whichever set of allocations makes sense. 
(We assume no single bundle
- * will be used for both purposes.)
+ * The remainder is called "bundle free memory", a pool of memory that can be 
used for {@link SuperSorter} or
+ * {@link SegmentGeneratorFrameProcessorFactory}. The amounts overlap, because 
the same {@link WorkOrder} never
+ * does both.
  */
 public class WorkerMemoryParameters
 {
-  private static final Logger log = new Logger(WorkerMemoryParameters.class);
-
-  /**
-   * Percent of memory that we allocate to bundles. It is less than 100% 
because we need to leave some space
-   * left over for miscellaneous other stuff, and to ensure that GC pressure 
does not get too high.
-   */
-  static final double USABLE_MEMORY_FRACTION = 0.75;
-
-  /**
-   * Percent of each bundle's memory that we allocate to appenderators. It is 
less than 100% because appenderators
-   * unfortunately have a variety of unaccounted-for memory usage.
-   */
-  static final double APPENDERATOR_MEMORY_FRACTION = 0.67;
-
-  /**
-   * Size for "standard frames", which are used for most purposes, except 
inputs to super-sorters.
-   *
-   * In particular, frames that travel between workers are always the minimum 
size. This is helpful because it makes
-   * it easier to compute the amount of memory needed to merge input streams.
-   */
-  private static final int STANDARD_FRAME_SIZE = 1_000_000;
-
-  /**
-   * Size for "large frames", which are used for inputs and inner channels in 
to super-sorters.
-   *
-   * This is helpful because it minimizes the number of temporary files needed 
during super-sorting.
-   */
-  private static final int LARGE_FRAME_SIZE = 8_000_000;
-
   /**
-   * Minimum amount of bundle memory available for processing (i.e., total 
bundle size minus the amount
-   * needed for input channels). This memory is guaranteed to be available for 
things like segment generation
-   * and broadcast data.
+   * Default size for frames.
    */
-  public static final long PROCESSING_MINIMUM_BYTES = 25_000_000;
+  public static final int DEFAULT_FRAME_SIZE = 1_000_000;
 
   /**
-   * Maximum amount of parallelism for the super-sorter. Higher amounts of 
concurrency tend to be wasteful.
+   * Amount of extra memory available for each processing thread, beyond what 
is needed for input and output
+   * channels. This memory is used for miscellaneous purposes within the 
various {@link FrameProcessor}.
    */
-  private static final int MAX_SUPER_SORTER_PROCESSORS = 4;
+  private static final long EXTRA_MEMORY_PER_PROCESSOR = 25_000_000;
 
   /**
-   * Each super-sorter must have at least 1 processor with 2 input frames and 
1 output frame. That's 3 total.
+   * Percent of each bundle's free memory that we allocate to appenderators. 
It is less than 100% because appenderators
+   * unfortunately have a variety of unaccounted-for memory usage.
    */
-  private static final int MIN_SUPER_SORTER_FRAMES = 3;
+  private static final double APPENDERATOR_BUNDLE_FREE_MEMORY_FRACTION = 0.67;
 
   /**
    * (Very) rough estimate of the on-heap overhead of reading a column.
    */
   private static final int APPENDERATOR_MERGE_ROUGH_MEMORY_PER_COLUMN = 3_000;
 
   /**
-   * Maximum percent of *total* available memory (not each bundle), i.e. 
{@link #USABLE_MEMORY_FRACTION}, that we'll
-   * ever use for maxRetainedBytes of {@link ClusterByStatisticsCollectorImpl} 
across all workers.
+   * Maximum percent of each bundle's free memory that will be used for 
maxRetainedBytes of
+   * {@link ClusterByStatisticsCollectorImpl}.
    */
-  private static final double PARTITION_STATS_MEMORY_MAX_FRACTION = 0.1;
+  private static final double PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION 
= 0.1;
 
   /**
-   * Maximum number of bytes we'll ever use for maxRetainedBytes of {@link 
ClusterByStatisticsCollectorImpl} for
-   * a single worker. Acts as a limit on the value computed based on {@link 
#PARTITION_STATS_MEMORY_MAX_FRACTION}.
+   * Maximum number of bytes from each bundle's free memory that we'll ever 
use for maxRetainedBytes of
+   * {@link ClusterByStatisticsCollectorImpl}. Limits the value computed based 
on
+   * {@link #PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION}.
    */
-  private static final long PARTITION_STATS_MEMORY_MAX_BYTES = 300_000_000;
+  private static final long PARTITION_STATS_MAX_MEMORY_PER_BUNDLE = 
300_000_000;
 
   /**
-   * Threshold in bytes below which we assume that the worker is "small". 
While calculating the memory requirements for
-   * a small worker, we try to be as conservatives with the estimates and the 
extra temporary space required by the
-   * frames, since that can add up quickly and cause OOM.
+   * Minimum number of bytes from each bundle's free memory that we'll use for 
maxRetainedBytes of
+   * {@link ClusterByStatisticsCollectorImpl}.
    */
-  private static final long SMALL_WORKER_CAPACITY_THRESHOLD_BYTES = 
256_000_000;
+  private static final long PARTITION_STATS_MIN_MEMORY_PER_BUNDLE = 10_000_000;
 
   /**
-   * Fraction of free memory per bundle that can be used by {@link 
BroadcastJoinSegmentMapFnProcessor} to store broadcast
-   * data on-heap. This is used to limit the total size of input frames, which 
we expect to expand on-heap. Expansion
-   * can potentially be somewhat over 2x: for example, strings are UTF-8 in 
frames, but are UTF-16 on-heap, which is
-   * a 2x expansion, and object and index overhead must be considered on top 
of that. So we use a value somewhat
-   * lower than 0.5.
+   * Fraction of each bundle's total memory that can be used to buffer 
broadcast inputs. This is used by
+   * {@link BroadcastJoinSegmentMapFnProcessor} to limit how much joinable 
data is stored on-heap. This is carved
+   * directly out of the total bundle memory, which makes its size more 
predictable and stable: it only depends on
+   * the total JVM memory, the number of tasks per JVM, and the value of 
maxConcurrentStages for the query. This
+   * stability is important, because if the broadcast buffer fills up, the 
query fails. So any time its size changes,
+   * we risk queries failing that would formerly have succeeded.
    */
-  static final double BROADCAST_JOIN_MEMORY_FRACTION = 0.3;
+  private static final double BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION = 0.2;
 
   /**
-   * Fraction of free memory per bundle that can be used by
-   * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor} 
to buffer frames in its trackers.
+   * Multiplier to apply to {@link #BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION} 
when determining how much free bundle
+   * memory is left over. This fudge factor exists because {@link 
BroadcastJoinSegmentMapFnProcessor} applies data
+   * size limits based on frame size, which we expect to expand somewhat in 
memory due to indexing structures in
+   * {@link org.apache.druid.segment.join.table.FrameBasedIndexedTable}.
    */
-  static final double SORT_MERGE_JOIN_MEMORY_FRACTION = 0.9;
+  private static final double BROADCAST_BUFFER_OVERHEAD_RATIO = 1.5;
 
   /**
-   * In case {@link NotEnoughMemoryFault} is thrown, a fixed estimation 
overhead is added when estimating total memory required for the process.
+   * Amount of memory that can be used by
+   * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor} 
to buffer frames in its trackers.
    */
-  private static final long BUFFER_BYTES_FOR_ESTIMATION = 1000;
+  private static final long SORT_MERGE_JOIN_MEMORY_PER_PROCESSOR = (long) 
(EXTRA_MEMORY_PER_PROCESSOR * 0.9);
 
-  private final long processorBundleMemory;
-  private final int superSorterMaxActiveProcessors;
-  private final int superSorterMaxChannelsPerProcessor;
+  private final long bundleFreeMemory;
+  private final int frameSize;
+  private final int superSorterConcurrentProcessors;
+  private final int superSorterMaxChannelsPerMerger;
   private final int partitionStatisticsMaxRetainedBytes;
-
-  WorkerMemoryParameters(
-      final long processorBundleMemory,
-      final int superSorterMaxActiveProcessors,
-      final int superSorterMaxChannelsPerProcessor,
-      final int partitionStatisticsMaxRetainedBytes
+  private final long broadcastBufferMemory;
+
+  public WorkerMemoryParameters(
+      final long bundleFreeMemory,
+      final int frameSize,
+      final int superSorterConcurrentProcessors,
+      final int superSorterMaxChannelsPerMerger,
+      final int partitionStatisticsMaxRetainedBytes,
+      final long broadcastBufferMemory
   )
   {
-    this.processorBundleMemory = processorBundleMemory;
-    this.superSorterMaxActiveProcessors = superSorterMaxActiveProcessors;
-    this.superSorterMaxChannelsPerProcessor = 
superSorterMaxChannelsPerProcessor;
+    this.bundleFreeMemory = bundleFreeMemory;
+    this.frameSize = frameSize;
+    this.superSorterConcurrentProcessors = superSorterConcurrentProcessors;
+    this.superSorterMaxChannelsPerMerger = superSorterMaxChannelsPerMerger;
     this.partitionStatisticsMaxRetainedBytes = 
partitionStatisticsMaxRetainedBytes;
+    this.broadcastBufferMemory = broadcastBufferMemory;
   }
 
   /**
-   * Create a production instance for {@link 
org.apache.druid.msq.indexing.MSQWorkerTask}.
+   * Create a production instance for a given {@link WorkOrder}.
    */
-  public static WorkerMemoryParameters createProductionInstanceForWorker(
-      final Injector injector,
-      final QueryDefinition queryDef,
-      final int stageNumber,
+  public static WorkerMemoryParameters createProductionInstance(
+      final WorkOrder workOrder,
+      final MemoryIntrospector memoryIntrospector,
       final int maxConcurrentStages
   )
   {
-    final StageDefinition stageDef = queryDef.getStageDefinition(stageNumber);
-    final IntSet inputStageNumbers = 
InputSpecs.getStageNumbers(stageDef.getInputSpecs());
-    final int numInputWorkers =
-        inputStageNumbers.intStream()
-                         .map(inputStageNumber -> 
queryDef.getStageDefinition(inputStageNumber).getMaxWorkerCount())
-                         .sum();
-    long totalLookupFootprint = computeTotalLookupFootprint(injector);
-
-    final int numHashOutputPartitions;
-    if (stageDef.doesShuffle() && stageDef.getShuffleSpec().kind().isHash()) {
-      numHashOutputPartitions = stageDef.getShuffleSpec().partitionCount();
-    } else {
-      numHashOutputPartitions = 0;
-    }
-
+    final StageDefinition stageDef = workOrder.getStageDefinition();
     return createInstance(
-        Runtime.getRuntime().maxMemory(),
-        computeNumWorkersInJvm(injector),
-        computeNumProcessorsInJvm(injector),
+        memoryIntrospector,
+        DEFAULT_FRAME_SIZE,
+        workOrder.getInputs(),
+        stageDef.getBroadcastInputNumbers(),
+        stageDef.doesShuffle() ? stageDef.getShuffleSpec() : null,
         maxConcurrentStages,
-        numInputWorkers,
-        numHashOutputPartitions,
-        totalLookupFootprint
+        computeFramesPerOutputChannel(workOrder.getOutputChannelMode())
     );
   }
 
   /**
-   * Returns an object specifying memory-usage parameters.
+   * Returns an object specifying memory-usage parameters for a {@link 
WorkOrder} running inside a {@link Worker}.
    *
    * Throws a {@link MSQException} with an appropriate fault if the provided 
combination of parameters cannot
    * yield a workable memory situation.
    *
-   * @param maxMemoryInJvm            memory available in the entire JVM. This 
will be divided amongst processors.
-   * @param numWorkersInJvm           number of workers that can run 
concurrently in this JVM. Generally equal to
-   *                                  the task capacity.
-   * @param numProcessingThreadsInJvm size of the processing thread pool in 
the JVM.
-   * @param maxConcurrentStages       maximum number of concurrent stages per 
worker.
-   * @param numInputWorkers           total number of workers across all input 
stages.
-   * @param numHashOutputPartitions   total number of output partitions, if 
using hash partitioning; zero if not using
-   *                                  hash partitioning.
-   * @param totalLookupFootprint      estimated size of the lookups loaded by 
the process.
+   * @param memoryIntrospector           memory introspector
+   * @param frameSize                    frame size
+   * @param hasBroadcastInputs           whether this stage has broadcast 
inputs. If so, we allocate some memory
+   *                                     for {@link 
#getBroadcastBufferMemory()}.
+   * @param needsStatistics              whether this stage needs to collect 
key statistics, i.e.,
+   *                                     {@link 
StageDefinition#mustGatherResultKeyStatistics()}
+   * @param inputWorkers                 figure from {@link 
#computeNumInputWorkers}
+   * @param maxInputChannelsPerProcessor figure from {@link 
#computeMaxSimultaneousInputChannelsPerProcessor}
+   * @param maxConcurrentStages          figure from {@link 
WorkerContext#maxConcurrentStages()}
+   * @param maxOutputPartitions          figure from {@link 
#computeMaxOutputPartitions(ShuffleSpec)}

Review Comment:
   ## Spurious Javadoc @param tags
   
   @param tag "maxOutputPartitions" does not match any actual parameter of 
method "createInstance()".
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/7811)



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java:
##########
@@ -19,349 +19,285 @@
 
 package org.apache.druid.msq.exec;
 
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.primitives.Ints;
-import com.google.inject.Injector;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
-import org.apache.druid.frame.processor.Bouncer;
-import org.apache.druid.indexing.worker.config.WorkerConfig;
-import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.SuperSorter;
 import org.apache.druid.msq.indexing.error.MSQException;
 import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
 import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
-import org.apache.druid.msq.input.InputSpecs;
-import org.apache.druid.msq.kernel.QueryDefinition;
+import 
org.apache.druid.msq.indexing.processor.KeyStatisticsCollectionProcessor;
+import 
org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSlices;
+import org.apache.druid.msq.input.stage.ReadablePartition;
+import org.apache.druid.msq.input.stage.StageInputSlice;
+import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec;
+import org.apache.druid.msq.kernel.ShuffleSpec;
 import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.WorkOrder;
 import org.apache.druid.msq.querykit.BroadcastJoinSegmentMapFnProcessor;
 import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
-import org.apache.druid.query.lookup.LookupExtractor;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
-import 
org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
+import org.apache.druid.segment.incremental.IncrementalIndex;
 
+import javax.annotation.Nullable;
+import java.util.List;
 import java.util.Objects;
 
 /**
- * Class for determining how much JVM heap to allocate to various purposes.
+ * Class for determining how much JVM heap to allocate to various purposes for 
executing a {@link WorkOrder}.
  *
- * First, we take a chunk out of the total JVM heap that is dedicated for MSQ; 
see {@link #computeUsableMemoryInJvm}.
+ * First, we split each worker's memory allotment, given by {@link 
MemoryIntrospector#memoryPerTask()}, into
+ * equally-sized "bundles" for each {@link WorkOrder} that may be running 
simultaneously within the {@link Worker}
+ * for that {@link WorkOrder}.
  *
- * Then, we carve out some space for each worker that may be running in our 
JVM; see {@link #memoryPerWorker}.
+ * Within each bundle, we carve out memory required for buffering broadcast 
data
+ * (see {@link #computeBroadcastBufferMemory}) and for concurrently-running 
processors
+ * (see {@link #computeProcessorMemory}).
  *
- * Then, we split the rest into "bundles" of equal size; see {@link 
#memoryPerBundle}. The number of bundles is based
- * entirely on server configuration; this makes the calculation robust to 
different queries running simultaneously in
- * the same JVM.
- *
- * Within each bundle, we split up memory in two different ways: one assuming 
it'll be used for a
- * {@link org.apache.druid.frame.processor.SuperSorter}, and one assuming 
it'll be used for a regular
- * processor. Callers can then use whichever set of allocations makes sense. 
(We assume no single bundle
- * will be used for both purposes.)
+ * The remainder is called "bundle free memory", a pool of memory that can be 
used for {@link SuperSorter} or
+ * {@link SegmentGeneratorFrameProcessorFactory}. The amounts overlap, because 
the same {@link WorkOrder} never
+ * does both.
  */
 public class WorkerMemoryParameters
 {
-  private static final Logger log = new Logger(WorkerMemoryParameters.class);
-
-  /**
-   * Percent of memory that we allocate to bundles. It is less than 100% 
because we need to leave some space
-   * left over for miscellaneous other stuff, and to ensure that GC pressure 
does not get too high.
-   */
-  static final double USABLE_MEMORY_FRACTION = 0.75;
-
-  /**
-   * Percent of each bundle's memory that we allocate to appenderators. It is 
less than 100% because appenderators
-   * unfortunately have a variety of unaccounted-for memory usage.
-   */
-  static final double APPENDERATOR_MEMORY_FRACTION = 0.67;
-
-  /**
-   * Size for "standard frames", which are used for most purposes, except 
inputs to super-sorters.
-   *
-   * In particular, frames that travel between workers are always the minimum 
size. This is helpful because it makes
-   * it easier to compute the amount of memory needed to merge input streams.
-   */
-  private static final int STANDARD_FRAME_SIZE = 1_000_000;
-
-  /**
-   * Size for "large frames", which are used for inputs and inner channels in 
to super-sorters.
-   *
-   * This is helpful because it minimizes the number of temporary files needed 
during super-sorting.
-   */
-  private static final int LARGE_FRAME_SIZE = 8_000_000;
-
   /**
-   * Minimum amount of bundle memory available for processing (i.e., total 
bundle size minus the amount
-   * needed for input channels). This memory is guaranteed to be available for 
things like segment generation
-   * and broadcast data.
+   * Default size for frames.
    */
-  public static final long PROCESSING_MINIMUM_BYTES = 25_000_000;
+  public static final int DEFAULT_FRAME_SIZE = 1_000_000;
 
   /**
-   * Maximum amount of parallelism for the super-sorter. Higher amounts of 
concurrency tend to be wasteful.
+   * Amount of extra memory available for each processing thread, beyond what 
is needed for input and output
+   * channels. This memory is used for miscellaneous purposes within the 
various {@link FrameProcessor}.
    */
-  private static final int MAX_SUPER_SORTER_PROCESSORS = 4;
+  private static final long EXTRA_MEMORY_PER_PROCESSOR = 25_000_000;
 
   /**
-   * Each super-sorter must have at least 1 processor with 2 input frames and 
1 output frame. That's 3 total.
+   * Percent of each bundle's free memory that we allocate to appenderators. 
It is less than 100% because appenderators
+   * unfortunately have a variety of unaccounted-for memory usage.
    */
-  private static final int MIN_SUPER_SORTER_FRAMES = 3;
+  private static final double APPENDERATOR_BUNDLE_FREE_MEMORY_FRACTION = 0.67;
 
   /**
    * (Very) rough estimate of the on-heap overhead of reading a column.
    */
   private static final int APPENDERATOR_MERGE_ROUGH_MEMORY_PER_COLUMN = 3_000;
 
   /**
-   * Maximum percent of *total* available memory (not each bundle), i.e. 
{@link #USABLE_MEMORY_FRACTION}, that we'll
-   * ever use for maxRetainedBytes of {@link ClusterByStatisticsCollectorImpl} 
across all workers.
+   * Maximum percent of each bundle's free memory that will be used for 
maxRetainedBytes of
+   * {@link ClusterByStatisticsCollectorImpl}.
    */
-  private static final double PARTITION_STATS_MEMORY_MAX_FRACTION = 0.1;
+  private static final double PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION 
= 0.1;
 
   /**
-   * Maximum number of bytes we'll ever use for maxRetainedBytes of {@link 
ClusterByStatisticsCollectorImpl} for
-   * a single worker. Acts as a limit on the value computed based on {@link 
#PARTITION_STATS_MEMORY_MAX_FRACTION}.
+   * Maximum number of bytes from each bundle's free memory that we'll ever 
use for maxRetainedBytes of
+   * {@link ClusterByStatisticsCollectorImpl}. Limits the value computed based 
on
+   * {@link #PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION}.
    */
-  private static final long PARTITION_STATS_MEMORY_MAX_BYTES = 300_000_000;
+  private static final long PARTITION_STATS_MAX_MEMORY_PER_BUNDLE = 
300_000_000;
 
   /**
-   * Threshold in bytes below which we assume that the worker is "small". 
While calculating the memory requirements for
-   * a small worker, we try to be as conservatives with the estimates and the 
extra temporary space required by the
-   * frames, since that can add up quickly and cause OOM.
+   * Minimum number of bytes from each bundle's free memory that we'll use for 
maxRetainedBytes of
+   * {@link ClusterByStatisticsCollectorImpl}.
    */
-  private static final long SMALL_WORKER_CAPACITY_THRESHOLD_BYTES = 
256_000_000;
+  private static final long PARTITION_STATS_MIN_MEMORY_PER_BUNDLE = 10_000_000;
 
   /**
-   * Fraction of free memory per bundle that can be used by {@link 
BroadcastJoinSegmentMapFnProcessor} to store broadcast
-   * data on-heap. This is used to limit the total size of input frames, which 
we expect to expand on-heap. Expansion
-   * can potentially be somewhat over 2x: for example, strings are UTF-8 in 
frames, but are UTF-16 on-heap, which is
-   * a 2x expansion, and object and index overhead must be considered on top 
of that. So we use a value somewhat
-   * lower than 0.5.
+   * Fraction of each bundle's total memory that can be used to buffer 
broadcast inputs. This is used by
+   * {@link BroadcastJoinSegmentMapFnProcessor} to limit how much joinable 
data is stored on-heap. This is carved
+   * directly out of the total bundle memory, which makes its size more 
predictable and stable: it only depends on
+   * the total JVM memory, the number of tasks per JVM, and the value of 
maxConcurrentStages for the query. This
+   * stability is important, because if the broadcast buffer fills up, the 
query fails. So any time its size changes,
+   * we risk queries failing that would formerly have succeeded.
    */
-  static final double BROADCAST_JOIN_MEMORY_FRACTION = 0.3;
+  private static final double BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION = 0.2;
 
   /**
-   * Fraction of free memory per bundle that can be used by
-   * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor} 
to buffer frames in its trackers.
+   * Multiplier to apply to {@link #BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION} 
when determining how much free bundle
+   * memory is left over. This fudge factor exists because {@link 
BroadcastJoinSegmentMapFnProcessor} applies data
+   * size limits based on frame size, which we expect to expand somewhat in 
memory due to indexing structures in
+   * {@link org.apache.druid.segment.join.table.FrameBasedIndexedTable}.
    */
-  static final double SORT_MERGE_JOIN_MEMORY_FRACTION = 0.9;
+  private static final double BROADCAST_BUFFER_OVERHEAD_RATIO = 1.5;
 
   /**
-   * In case {@link NotEnoughMemoryFault} is thrown, a fixed estimation 
overhead is added when estimating total memory required for the process.
+   * Amount of memory that can be used by
+   * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor} 
to buffer frames in its trackers.
    */
-  private static final long BUFFER_BYTES_FOR_ESTIMATION = 1000;
+  private static final long SORT_MERGE_JOIN_MEMORY_PER_PROCESSOR = (long) 
(EXTRA_MEMORY_PER_PROCESSOR * 0.9);
 
-  private final long processorBundleMemory;
-  private final int superSorterMaxActiveProcessors;
-  private final int superSorterMaxChannelsPerProcessor;
+  private final long bundleFreeMemory;
+  private final int frameSize;
+  private final int superSorterConcurrentProcessors;
+  private final int superSorterMaxChannelsPerMerger;
   private final int partitionStatisticsMaxRetainedBytes;
-
-  WorkerMemoryParameters(
-      final long processorBundleMemory,
-      final int superSorterMaxActiveProcessors,
-      final int superSorterMaxChannelsPerProcessor,
-      final int partitionStatisticsMaxRetainedBytes
+  private final long broadcastBufferMemory;
+
+  public WorkerMemoryParameters(
+      final long bundleFreeMemory,
+      final int frameSize,
+      final int superSorterConcurrentProcessors,
+      final int superSorterMaxChannelsPerMerger,
+      final int partitionStatisticsMaxRetainedBytes,
+      final long broadcastBufferMemory
   )
   {
-    this.processorBundleMemory = processorBundleMemory;
-    this.superSorterMaxActiveProcessors = superSorterMaxActiveProcessors;
-    this.superSorterMaxChannelsPerProcessor = 
superSorterMaxChannelsPerProcessor;
+    this.bundleFreeMemory = bundleFreeMemory;
+    this.frameSize = frameSize;
+    this.superSorterConcurrentProcessors = superSorterConcurrentProcessors;
+    this.superSorterMaxChannelsPerMerger = superSorterMaxChannelsPerMerger;
     this.partitionStatisticsMaxRetainedBytes = 
partitionStatisticsMaxRetainedBytes;
+    this.broadcastBufferMemory = broadcastBufferMemory;
   }
 
   /**
-   * Create a production instance for {@link 
org.apache.druid.msq.indexing.MSQWorkerTask}.
+   * Create a production instance for a given {@link WorkOrder}.
    */
-  public static WorkerMemoryParameters createProductionInstanceForWorker(
-      final Injector injector,
-      final QueryDefinition queryDef,
-      final int stageNumber,
+  public static WorkerMemoryParameters createProductionInstance(
+      final WorkOrder workOrder,
+      final MemoryIntrospector memoryIntrospector,
       final int maxConcurrentStages
   )
   {
-    final StageDefinition stageDef = queryDef.getStageDefinition(stageNumber);
-    final IntSet inputStageNumbers = 
InputSpecs.getStageNumbers(stageDef.getInputSpecs());
-    final int numInputWorkers =
-        inputStageNumbers.intStream()
-                         .map(inputStageNumber -> 
queryDef.getStageDefinition(inputStageNumber).getMaxWorkerCount())
-                         .sum();
-    long totalLookupFootprint = computeTotalLookupFootprint(injector);
-
-    final int numHashOutputPartitions;
-    if (stageDef.doesShuffle() && stageDef.getShuffleSpec().kind().isHash()) {
-      numHashOutputPartitions = stageDef.getShuffleSpec().partitionCount();
-    } else {
-      numHashOutputPartitions = 0;
-    }
-
+    final StageDefinition stageDef = workOrder.getStageDefinition();
     return createInstance(
-        Runtime.getRuntime().maxMemory(),
-        computeNumWorkersInJvm(injector),
-        computeNumProcessorsInJvm(injector),
+        memoryIntrospector,
+        DEFAULT_FRAME_SIZE,
+        workOrder.getInputs(),
+        stageDef.getBroadcastInputNumbers(),
+        stageDef.doesShuffle() ? stageDef.getShuffleSpec() : null,
         maxConcurrentStages,
-        numInputWorkers,
-        numHashOutputPartitions,
-        totalLookupFootprint
+        computeFramesPerOutputChannel(workOrder.getOutputChannelMode())
     );
   }
 
   /**
-   * Returns an object specifying memory-usage parameters.
+   * Returns an object specifying memory-usage parameters for a {@link 
WorkOrder} running inside a {@link Worker}.
    *
    * Throws a {@link MSQException} with an appropriate fault if the provided 
combination of parameters cannot
    * yield a workable memory situation.
    *
-   * @param maxMemoryInJvm            memory available in the entire JVM. This 
will be divided amongst processors.
-   * @param numWorkersInJvm           number of workers that can run 
concurrently in this JVM. Generally equal to
-   *                                  the task capacity.
-   * @param numProcessingThreadsInJvm size of the processing thread pool in 
the JVM.
-   * @param maxConcurrentStages       maximum number of concurrent stages per 
worker.
-   * @param numInputWorkers           total number of workers across all input 
stages.
-   * @param numHashOutputPartitions   total number of output partitions, if 
using hash partitioning; zero if not using
-   *                                  hash partitioning.
-   * @param totalLookupFootprint      estimated size of the lookups loaded by 
the process.
+   * @param memoryIntrospector           memory introspector
+   * @param frameSize                    frame size
+   * @param hasBroadcastInputs           whether this stage has broadcast 
inputs. If so, we allocate some memory
+   *                                     for {@link 
#getBroadcastBufferMemory()}.
+   * @param needsStatistics              whether this stage needs to collect 
key statistics, i.e.,
+   *                                     {@link 
StageDefinition#mustGatherResultKeyStatistics()}
+   * @param inputWorkers                 figure from {@link 
#computeNumInputWorkers}
+   * @param maxInputChannelsPerProcessor figure from {@link 
#computeMaxSimultaneousInputChannelsPerProcessor}

Review Comment:
   ## Spurious Javadoc @param tags
   
   @param tag "maxInputChannelsPerProcessor" does not match any actual 
parameter of method "createInstance()".
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/7810)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to