gianm commented on code in PR #17057:
URL: https://github.com/apache/druid/pull/17057#discussion_r1758543061
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java:
##########
@@ -19,349 +19,285 @@
package org.apache.druid.msq.exec;
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
-import com.google.inject.Injector;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
-import org.apache.druid.frame.processor.Bouncer;
-import org.apache.druid.indexing.worker.config.WorkerConfig;
-import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.SuperSorter;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
-import org.apache.druid.msq.input.InputSpecs;
-import org.apache.druid.msq.kernel.QueryDefinition;
+import
org.apache.druid.msq.indexing.processor.KeyStatisticsCollectionProcessor;
+import
org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSlices;
+import org.apache.druid.msq.input.stage.ReadablePartition;
+import org.apache.druid.msq.input.stage.StageInputSlice;
+import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec;
+import org.apache.druid.msq.kernel.ShuffleSpec;
import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.querykit.BroadcastJoinSegmentMapFnProcessor;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
-import org.apache.druid.query.lookup.LookupExtractor;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
-import
org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import javax.annotation.Nullable;
+import java.util.List;
import java.util.Objects;
/**
- * Class for determining how much JVM heap to allocate to various purposes.
+ * Class for determining how much JVM heap to allocate to various purposes for
executing a {@link WorkOrder}.
*
- * First, we take a chunk out of the total JVM heap that is dedicated for MSQ;
see {@link #computeUsableMemoryInJvm}.
+ * First, we split each worker's memory allotment, given by {@link
MemoryIntrospector#memoryPerTask()}, into
+ * equally-sized "bundles" for each {@link WorkOrder} that may be running
simultaneously within the {@link Worker}
+ * for that {@link WorkOrder}.
*
- * Then, we carve out some space for each worker that may be running in our
JVM; see {@link #memoryPerWorker}.
+ * Within each bundle, we carve out memory required for buffering broadcast
data
+ * (see {@link #computeBroadcastBufferMemory}) and for concurrently-running
processors
+ * (see {@link #computeProcessorMemory}).
*
- * Then, we split the rest into "bundles" of equal size; see {@link
#memoryPerBundle}. The number of bundles is based
- * entirely on server configuration; this makes the calculation robust to
different queries running simultaneously in
- * the same JVM.
- *
- * Within each bundle, we split up memory in two different ways: one assuming
it'll be used for a
- * {@link org.apache.druid.frame.processor.SuperSorter}, and one assuming
it'll be used for a regular
- * processor. Callers can then use whichever set of allocations makes sense.
(We assume no single bundle
- * will be used for both purposes.)
+ * The remainder is called "bundle free memory", a pool of memory that can be
used for {@link SuperSorter} or
+ * {@link SegmentGeneratorFrameProcessorFactory}. The amounts overlap, because
the same {@link WorkOrder} never
+ * does both.
*/
public class WorkerMemoryParameters
{
- private static final Logger log = new Logger(WorkerMemoryParameters.class);
-
- /**
- * Percent of memory that we allocate to bundles. It is less than 100%
because we need to leave some space
- * left over for miscellaneous other stuff, and to ensure that GC pressure
does not get too high.
- */
- static final double USABLE_MEMORY_FRACTION = 0.75;
-
- /**
- * Percent of each bundle's memory that we allocate to appenderators. It is
less than 100% because appenderators
- * unfortunately have a variety of unaccounted-for memory usage.
- */
- static final double APPENDERATOR_MEMORY_FRACTION = 0.67;
-
- /**
- * Size for "standard frames", which are used for most purposes, except
inputs to super-sorters.
- *
- * In particular, frames that travel between workers are always the minimum
size. This is helpful because it makes
- * it easier to compute the amount of memory needed to merge input streams.
- */
- private static final int STANDARD_FRAME_SIZE = 1_000_000;
-
- /**
- * Size for "large frames", which are used for inputs and inner channels in
to super-sorters.
- *
- * This is helpful because it minimizes the number of temporary files needed
during super-sorting.
- */
- private static final int LARGE_FRAME_SIZE = 8_000_000;
-
/**
- * Minimum amount of bundle memory available for processing (i.e., total
bundle size minus the amount
- * needed for input channels). This memory is guaranteed to be available for
things like segment generation
- * and broadcast data.
+ * Default size for frames.
*/
- public static final long PROCESSING_MINIMUM_BYTES = 25_000_000;
+ public static final int DEFAULT_FRAME_SIZE = 1_000_000;
/**
- * Maximum amount of parallelism for the super-sorter. Higher amounts of
concurrency tend to be wasteful.
+ * Amount of extra memory available for each processing thread, beyond what
is needed for input and output
+ * channels. This memory is used for miscellaneous purposes within the
various {@link FrameProcessor}.
*/
- private static final int MAX_SUPER_SORTER_PROCESSORS = 4;
+ private static final long EXTRA_MEMORY_PER_PROCESSOR = 25_000_000;
/**
- * Each super-sorter must have at least 1 processor with 2 input frames and
1 output frame. That's 3 total.
+ * Percent of each bundle's free memory that we allocate to appenderators.
It is less than 100% because appenderators
+ * unfortunately have a variety of unaccounted-for memory usage.
*/
- private static final int MIN_SUPER_SORTER_FRAMES = 3;
+ private static final double APPENDERATOR_BUNDLE_FREE_MEMORY_FRACTION = 0.67;
/**
* (Very) rough estimate of the on-heap overhead of reading a column.
*/
private static final int APPENDERATOR_MERGE_ROUGH_MEMORY_PER_COLUMN = 3_000;
/**
- * Maximum percent of *total* available memory (not each bundle), i.e.
{@link #USABLE_MEMORY_FRACTION}, that we'll
- * ever use for maxRetainedBytes of {@link ClusterByStatisticsCollectorImpl}
across all workers.
+ * Maximum percent of each bundle's free memory that will be used for
maxRetainedBytes of
+ * {@link ClusterByStatisticsCollectorImpl}.
*/
- private static final double PARTITION_STATS_MEMORY_MAX_FRACTION = 0.1;
+ private static final double PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION
= 0.1;
/**
- * Maximum number of bytes we'll ever use for maxRetainedBytes of {@link
ClusterByStatisticsCollectorImpl} for
- * a single worker. Acts as a limit on the value computed based on {@link
#PARTITION_STATS_MEMORY_MAX_FRACTION}.
+ * Maximum number of bytes from each bundle's free memory that we'll ever
use for maxRetainedBytes of
+ * {@link ClusterByStatisticsCollectorImpl}. Limits the value computed based
on
+ * {@link #PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION}.
*/
- private static final long PARTITION_STATS_MEMORY_MAX_BYTES = 300_000_000;
+ private static final long PARTITION_STATS_MAX_MEMORY_PER_BUNDLE =
300_000_000;
/**
- * Threshold in bytes below which we assume that the worker is "small".
While calculating the memory requirements for
- * a small worker, we try to be as conservatives with the estimates and the
extra temporary space required by the
- * frames, since that can add up quickly and cause OOM.
+ * Minimum number of bytes from each bundle's free memory that we'll use for
maxRetainedBytes of
+ * {@link ClusterByStatisticsCollectorImpl}.
*/
- private static final long SMALL_WORKER_CAPACITY_THRESHOLD_BYTES =
256_000_000;
+ private static final long PARTITION_STATS_MIN_MEMORY_PER_BUNDLE = 10_000_000;
/**
- * Fraction of free memory per bundle that can be used by {@link
BroadcastJoinSegmentMapFnProcessor} to store broadcast
- * data on-heap. This is used to limit the total size of input frames, which
we expect to expand on-heap. Expansion
- * can potentially be somewhat over 2x: for example, strings are UTF-8 in
frames, but are UTF-16 on-heap, which is
- * a 2x expansion, and object and index overhead must be considered on top
of that. So we use a value somewhat
- * lower than 0.5.
+ * Fraction of each bundle's total memory that can be used to buffer
broadcast inputs. This is used by
+ * {@link BroadcastJoinSegmentMapFnProcessor} to limit how much joinable
data is stored on-heap. This is carved
+ * directly out of the total bundle memory, which makes its size more
predictable and stable: it only depends on
+ * the total JVM memory, the number of tasks per JVM, and the value of
maxConcurrentStages for the query. This
+ * stability is important, because if the broadcast buffer fills up, the
query fails. So any time its size changes,
+ * we risk queries failing that would formerly have succeeded.
*/
- static final double BROADCAST_JOIN_MEMORY_FRACTION = 0.3;
+ private static final double BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION = 0.2;
/**
- * Fraction of free memory per bundle that can be used by
- * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor}
to buffer frames in its trackers.
+ * Multiplier to apply to {@link #BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION}
when determining how much free bundle
+ * memory is left over. This fudge factor exists because {@link
BroadcastJoinSegmentMapFnProcessor} applies data
+ * size limits based on frame size, which we expect to expand somewhat in
memory due to indexing structures in
+ * {@link org.apache.druid.segment.join.table.FrameBasedIndexedTable}.
*/
- static final double SORT_MERGE_JOIN_MEMORY_FRACTION = 0.9;
+ private static final double BROADCAST_BUFFER_OVERHEAD_RATIO = 1.5;
/**
- * In case {@link NotEnoughMemoryFault} is thrown, a fixed estimation
overhead is added when estimating total memory required for the process.
+ * Amount of memory that can be used by
+ * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor}
to buffer frames in its trackers.
*/
- private static final long BUFFER_BYTES_FOR_ESTIMATION = 1000;
+ private static final long SORT_MERGE_JOIN_MEMORY_PER_PROCESSOR = (long)
(EXTRA_MEMORY_PER_PROCESSOR * 0.9);
- private final long processorBundleMemory;
- private final int superSorterMaxActiveProcessors;
- private final int superSorterMaxChannelsPerProcessor;
+ private final long bundleFreeMemory;
+ private final int frameSize;
+ private final int superSorterConcurrentProcessors;
+ private final int superSorterMaxChannelsPerMerger;
private final int partitionStatisticsMaxRetainedBytes;
-
- WorkerMemoryParameters(
- final long processorBundleMemory,
- final int superSorterMaxActiveProcessors,
- final int superSorterMaxChannelsPerProcessor,
- final int partitionStatisticsMaxRetainedBytes
+ private final long broadcastBufferMemory;
+
+ public WorkerMemoryParameters(
+ final long bundleFreeMemory,
+ final int frameSize,
+ final int superSorterConcurrentProcessors,
+ final int superSorterMaxChannelsPerMerger,
+ final int partitionStatisticsMaxRetainedBytes,
+ final long broadcastBufferMemory
)
{
- this.processorBundleMemory = processorBundleMemory;
- this.superSorterMaxActiveProcessors = superSorterMaxActiveProcessors;
- this.superSorterMaxChannelsPerProcessor =
superSorterMaxChannelsPerProcessor;
+ this.bundleFreeMemory = bundleFreeMemory;
+ this.frameSize = frameSize;
+ this.superSorterConcurrentProcessors = superSorterConcurrentProcessors;
+ this.superSorterMaxChannelsPerMerger = superSorterMaxChannelsPerMerger;
this.partitionStatisticsMaxRetainedBytes =
partitionStatisticsMaxRetainedBytes;
+ this.broadcastBufferMemory = broadcastBufferMemory;
}
/**
- * Create a production instance for {@link
org.apache.druid.msq.indexing.MSQWorkerTask}.
+ * Create a production instance for a given {@link WorkOrder}.
*/
- public static WorkerMemoryParameters createProductionInstanceForWorker(
- final Injector injector,
- final QueryDefinition queryDef,
- final int stageNumber,
+ public static WorkerMemoryParameters createProductionInstance(
+ final WorkOrder workOrder,
+ final MemoryIntrospector memoryIntrospector,
final int maxConcurrentStages
)
{
- final StageDefinition stageDef = queryDef.getStageDefinition(stageNumber);
- final IntSet inputStageNumbers =
InputSpecs.getStageNumbers(stageDef.getInputSpecs());
- final int numInputWorkers =
- inputStageNumbers.intStream()
- .map(inputStageNumber ->
queryDef.getStageDefinition(inputStageNumber).getMaxWorkerCount())
- .sum();
- long totalLookupFootprint = computeTotalLookupFootprint(injector);
-
- final int numHashOutputPartitions;
- if (stageDef.doesShuffle() && stageDef.getShuffleSpec().kind().isHash()) {
- numHashOutputPartitions = stageDef.getShuffleSpec().partitionCount();
- } else {
- numHashOutputPartitions = 0;
- }
-
+ final StageDefinition stageDef = workOrder.getStageDefinition();
return createInstance(
- Runtime.getRuntime().maxMemory(),
- computeNumWorkersInJvm(injector),
- computeNumProcessorsInJvm(injector),
+ memoryIntrospector,
+ DEFAULT_FRAME_SIZE,
+ workOrder.getInputs(),
+ stageDef.getBroadcastInputNumbers(),
+ stageDef.doesShuffle() ? stageDef.getShuffleSpec() : null,
maxConcurrentStages,
- numInputWorkers,
- numHashOutputPartitions,
- totalLookupFootprint
+ computeFramesPerOutputChannel(workOrder.getOutputChannelMode())
);
}
/**
- * Returns an object specifying memory-usage parameters.
+ * Returns an object specifying memory-usage parameters for a {@link
WorkOrder} running inside a {@link Worker}.
*
* Throws a {@link MSQException} with an appropriate fault if the provided
combination of parameters cannot
* yield a workable memory situation.
*
- * @param maxMemoryInJvm memory available in the entire JVM. This
will be divided amongst processors.
- * @param numWorkersInJvm number of workers that can run
concurrently in this JVM. Generally equal to
- * the task capacity.
- * @param numProcessingThreadsInJvm size of the processing thread pool in
the JVM.
- * @param maxConcurrentStages maximum number of concurrent stages per
worker.
- * @param numInputWorkers total number of workers across all input
stages.
- * @param numHashOutputPartitions total number of output partitions, if
using hash partitioning; zero if not using
- * hash partitioning.
- * @param totalLookupFootprint estimated size of the lookups loaded by
the process.
+ * @param memoryIntrospector memory introspector
+ * @param frameSize frame size
+ * @param hasBroadcastInputs whether this stage has broadcast
inputs. If so, we allocate some memory
Review Comment:
fixed, this was an outdated javadoc.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java:
##########
@@ -19,349 +19,285 @@
package org.apache.druid.msq.exec;
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
-import com.google.inject.Injector;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
-import org.apache.druid.frame.processor.Bouncer;
-import org.apache.druid.indexing.worker.config.WorkerConfig;
-import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.SuperSorter;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
-import org.apache.druid.msq.input.InputSpecs;
-import org.apache.druid.msq.kernel.QueryDefinition;
+import
org.apache.druid.msq.indexing.processor.KeyStatisticsCollectionProcessor;
+import
org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSlices;
+import org.apache.druid.msq.input.stage.ReadablePartition;
+import org.apache.druid.msq.input.stage.StageInputSlice;
+import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec;
+import org.apache.druid.msq.kernel.ShuffleSpec;
import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.querykit.BroadcastJoinSegmentMapFnProcessor;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
-import org.apache.druid.query.lookup.LookupExtractor;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
-import
org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import javax.annotation.Nullable;
+import java.util.List;
import java.util.Objects;
/**
- * Class for determining how much JVM heap to allocate to various purposes.
+ * Class for determining how much JVM heap to allocate to various purposes for
executing a {@link WorkOrder}.
*
- * First, we take a chunk out of the total JVM heap that is dedicated for MSQ;
see {@link #computeUsableMemoryInJvm}.
+ * First, we split each worker's memory allotment, given by {@link
MemoryIntrospector#memoryPerTask()}, into
+ * equally-sized "bundles" for each {@link WorkOrder} that may be running
simultaneously within the {@link Worker}
+ * for that {@link WorkOrder}.
*
- * Then, we carve out some space for each worker that may be running in our
JVM; see {@link #memoryPerWorker}.
+ * Within each bundle, we carve out memory required for buffering broadcast
data
+ * (see {@link #computeBroadcastBufferMemory}) and for concurrently-running
processors
+ * (see {@link #computeProcessorMemory}).
*
- * Then, we split the rest into "bundles" of equal size; see {@link
#memoryPerBundle}. The number of bundles is based
- * entirely on server configuration; this makes the calculation robust to
different queries running simultaneously in
- * the same JVM.
- *
- * Within each bundle, we split up memory in two different ways: one assuming
it'll be used for a
- * {@link org.apache.druid.frame.processor.SuperSorter}, and one assuming
it'll be used for a regular
- * processor. Callers can then use whichever set of allocations makes sense.
(We assume no single bundle
- * will be used for both purposes.)
+ * The remainder is called "bundle free memory", a pool of memory that can be
used for {@link SuperSorter} or
+ * {@link SegmentGeneratorFrameProcessorFactory}. The amounts overlap, because
the same {@link WorkOrder} never
+ * does both.
*/
public class WorkerMemoryParameters
{
- private static final Logger log = new Logger(WorkerMemoryParameters.class);
-
- /**
- * Percent of memory that we allocate to bundles. It is less than 100%
because we need to leave some space
- * left over for miscellaneous other stuff, and to ensure that GC pressure
does not get too high.
- */
- static final double USABLE_MEMORY_FRACTION = 0.75;
-
- /**
- * Percent of each bundle's memory that we allocate to appenderators. It is
less than 100% because appenderators
- * unfortunately have a variety of unaccounted-for memory usage.
- */
- static final double APPENDERATOR_MEMORY_FRACTION = 0.67;
-
- /**
- * Size for "standard frames", which are used for most purposes, except
inputs to super-sorters.
- *
- * In particular, frames that travel between workers are always the minimum
size. This is helpful because it makes
- * it easier to compute the amount of memory needed to merge input streams.
- */
- private static final int STANDARD_FRAME_SIZE = 1_000_000;
-
- /**
- * Size for "large frames", which are used for inputs and inner channels in
to super-sorters.
- *
- * This is helpful because it minimizes the number of temporary files needed
during super-sorting.
- */
- private static final int LARGE_FRAME_SIZE = 8_000_000;
-
/**
- * Minimum amount of bundle memory available for processing (i.e., total
bundle size minus the amount
- * needed for input channels). This memory is guaranteed to be available for
things like segment generation
- * and broadcast data.
+ * Default size for frames.
*/
- public static final long PROCESSING_MINIMUM_BYTES = 25_000_000;
+ public static final int DEFAULT_FRAME_SIZE = 1_000_000;
/**
- * Maximum amount of parallelism for the super-sorter. Higher amounts of
concurrency tend to be wasteful.
+ * Amount of extra memory available for each processing thread, beyond what
is needed for input and output
+ * channels. This memory is used for miscellaneous purposes within the
various {@link FrameProcessor}.
*/
- private static final int MAX_SUPER_SORTER_PROCESSORS = 4;
+ private static final long EXTRA_MEMORY_PER_PROCESSOR = 25_000_000;
/**
- * Each super-sorter must have at least 1 processor with 2 input frames and
1 output frame. That's 3 total.
+ * Percent of each bundle's free memory that we allocate to appenderators.
It is less than 100% because appenderators
+ * unfortunately have a variety of unaccounted-for memory usage.
*/
- private static final int MIN_SUPER_SORTER_FRAMES = 3;
+ private static final double APPENDERATOR_BUNDLE_FREE_MEMORY_FRACTION = 0.67;
/**
* (Very) rough estimate of the on-heap overhead of reading a column.
*/
private static final int APPENDERATOR_MERGE_ROUGH_MEMORY_PER_COLUMN = 3_000;
/**
- * Maximum percent of *total* available memory (not each bundle), i.e.
{@link #USABLE_MEMORY_FRACTION}, that we'll
- * ever use for maxRetainedBytes of {@link ClusterByStatisticsCollectorImpl}
across all workers.
+ * Maximum percent of each bundle's free memory that will be used for
maxRetainedBytes of
+ * {@link ClusterByStatisticsCollectorImpl}.
*/
- private static final double PARTITION_STATS_MEMORY_MAX_FRACTION = 0.1;
+ private static final double PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION
= 0.1;
/**
- * Maximum number of bytes we'll ever use for maxRetainedBytes of {@link
ClusterByStatisticsCollectorImpl} for
- * a single worker. Acts as a limit on the value computed based on {@link
#PARTITION_STATS_MEMORY_MAX_FRACTION}.
+ * Maximum number of bytes from each bundle's free memory that we'll ever
use for maxRetainedBytes of
+ * {@link ClusterByStatisticsCollectorImpl}. Limits the value computed based
on
+ * {@link #PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION}.
*/
- private static final long PARTITION_STATS_MEMORY_MAX_BYTES = 300_000_000;
+ private static final long PARTITION_STATS_MAX_MEMORY_PER_BUNDLE =
300_000_000;
/**
- * Threshold in bytes below which we assume that the worker is "small".
While calculating the memory requirements for
- * a small worker, we try to be as conservatives with the estimates and the
extra temporary space required by the
- * frames, since that can add up quickly and cause OOM.
+ * Minimum number of bytes from each bundle's free memory that we'll use for
maxRetainedBytes of
+ * {@link ClusterByStatisticsCollectorImpl}.
*/
- private static final long SMALL_WORKER_CAPACITY_THRESHOLD_BYTES =
256_000_000;
+ private static final long PARTITION_STATS_MIN_MEMORY_PER_BUNDLE = 10_000_000;
/**
- * Fraction of free memory per bundle that can be used by {@link
BroadcastJoinSegmentMapFnProcessor} to store broadcast
- * data on-heap. This is used to limit the total size of input frames, which
we expect to expand on-heap. Expansion
- * can potentially be somewhat over 2x: for example, strings are UTF-8 in
frames, but are UTF-16 on-heap, which is
- * a 2x expansion, and object and index overhead must be considered on top
of that. So we use a value somewhat
- * lower than 0.5.
+ * Fraction of each bundle's total memory that can be used to buffer
broadcast inputs. This is used by
+ * {@link BroadcastJoinSegmentMapFnProcessor} to limit how much joinable
data is stored on-heap. This is carved
+ * directly out of the total bundle memory, which makes its size more
predictable and stable: it only depends on
+ * the total JVM memory, the number of tasks per JVM, and the value of
maxConcurrentStages for the query. This
+ * stability is important, because if the broadcast buffer fills up, the
query fails. So any time its size changes,
+ * we risk queries failing that would formerly have succeeded.
*/
- static final double BROADCAST_JOIN_MEMORY_FRACTION = 0.3;
+ private static final double BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION = 0.2;
/**
- * Fraction of free memory per bundle that can be used by
- * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor}
to buffer frames in its trackers.
+ * Multiplier to apply to {@link #BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION}
when determining how much free bundle
+ * memory is left over. This fudge factor exists because {@link
BroadcastJoinSegmentMapFnProcessor} applies data
+ * size limits based on frame size, which we expect to expand somewhat in
memory due to indexing structures in
+ * {@link org.apache.druid.segment.join.table.FrameBasedIndexedTable}.
*/
- static final double SORT_MERGE_JOIN_MEMORY_FRACTION = 0.9;
+ private static final double BROADCAST_BUFFER_OVERHEAD_RATIO = 1.5;
/**
- * In case {@link NotEnoughMemoryFault} is thrown, a fixed estimation
overhead is added when estimating total memory required for the process.
+ * Amount of memory that can be used by
+ * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor}
to buffer frames in its trackers.
*/
- private static final long BUFFER_BYTES_FOR_ESTIMATION = 1000;
+ private static final long SORT_MERGE_JOIN_MEMORY_PER_PROCESSOR = (long)
(EXTRA_MEMORY_PER_PROCESSOR * 0.9);
- private final long processorBundleMemory;
- private final int superSorterMaxActiveProcessors;
- private final int superSorterMaxChannelsPerProcessor;
+ private final long bundleFreeMemory;
+ private final int frameSize;
+ private final int superSorterConcurrentProcessors;
+ private final int superSorterMaxChannelsPerMerger;
private final int partitionStatisticsMaxRetainedBytes;
-
- WorkerMemoryParameters(
- final long processorBundleMemory,
- final int superSorterMaxActiveProcessors,
- final int superSorterMaxChannelsPerProcessor,
- final int partitionStatisticsMaxRetainedBytes
+ private final long broadcastBufferMemory;
+
+ public WorkerMemoryParameters(
+ final long bundleFreeMemory,
+ final int frameSize,
+ final int superSorterConcurrentProcessors,
+ final int superSorterMaxChannelsPerMerger,
+ final int partitionStatisticsMaxRetainedBytes,
+ final long broadcastBufferMemory
)
{
- this.processorBundleMemory = processorBundleMemory;
- this.superSorterMaxActiveProcessors = superSorterMaxActiveProcessors;
- this.superSorterMaxChannelsPerProcessor =
superSorterMaxChannelsPerProcessor;
+ this.bundleFreeMemory = bundleFreeMemory;
+ this.frameSize = frameSize;
+ this.superSorterConcurrentProcessors = superSorterConcurrentProcessors;
+ this.superSorterMaxChannelsPerMerger = superSorterMaxChannelsPerMerger;
this.partitionStatisticsMaxRetainedBytes =
partitionStatisticsMaxRetainedBytes;
+ this.broadcastBufferMemory = broadcastBufferMemory;
}
/**
- * Create a production instance for {@link
org.apache.druid.msq.indexing.MSQWorkerTask}.
+ * Create a production instance for a given {@link WorkOrder}.
*/
- public static WorkerMemoryParameters createProductionInstanceForWorker(
- final Injector injector,
- final QueryDefinition queryDef,
- final int stageNumber,
+ public static WorkerMemoryParameters createProductionInstance(
+ final WorkOrder workOrder,
+ final MemoryIntrospector memoryIntrospector,
final int maxConcurrentStages
)
{
- final StageDefinition stageDef = queryDef.getStageDefinition(stageNumber);
- final IntSet inputStageNumbers =
InputSpecs.getStageNumbers(stageDef.getInputSpecs());
- final int numInputWorkers =
- inputStageNumbers.intStream()
- .map(inputStageNumber ->
queryDef.getStageDefinition(inputStageNumber).getMaxWorkerCount())
- .sum();
- long totalLookupFootprint = computeTotalLookupFootprint(injector);
-
- final int numHashOutputPartitions;
- if (stageDef.doesShuffle() && stageDef.getShuffleSpec().kind().isHash()) {
- numHashOutputPartitions = stageDef.getShuffleSpec().partitionCount();
- } else {
- numHashOutputPartitions = 0;
- }
-
+ final StageDefinition stageDef = workOrder.getStageDefinition();
return createInstance(
- Runtime.getRuntime().maxMemory(),
- computeNumWorkersInJvm(injector),
- computeNumProcessorsInJvm(injector),
+ memoryIntrospector,
+ DEFAULT_FRAME_SIZE,
+ workOrder.getInputs(),
+ stageDef.getBroadcastInputNumbers(),
+ stageDef.doesShuffle() ? stageDef.getShuffleSpec() : null,
maxConcurrentStages,
- numInputWorkers,
- numHashOutputPartitions,
- totalLookupFootprint
+ computeFramesPerOutputChannel(workOrder.getOutputChannelMode())
);
}
/**
- * Returns an object specifying memory-usage parameters.
+ * Returns an object specifying memory-usage parameters for a {@link
WorkOrder} running inside a {@link Worker}.
*
* Throws a {@link MSQException} with an appropriate fault if the provided
combination of parameters cannot
* yield a workable memory situation.
*
- * @param maxMemoryInJvm memory available in the entire JVM. This
will be divided amongst processors.
- * @param numWorkersInJvm number of workers that can run
concurrently in this JVM. Generally equal to
- * the task capacity.
- * @param numProcessingThreadsInJvm size of the processing thread pool in
the JVM.
- * @param maxConcurrentStages maximum number of concurrent stages per
worker.
- * @param numInputWorkers total number of workers across all input
stages.
- * @param numHashOutputPartitions total number of output partitions, if
using hash partitioning; zero if not using
- * hash partitioning.
- * @param totalLookupFootprint estimated size of the lookups loaded by
the process.
+ * @param memoryIntrospector memory introspector
+ * @param frameSize frame size
+ * @param hasBroadcastInputs whether this stage has broadcast
inputs. If so, we allocate some memory
+ * for {@link
#getBroadcastBufferMemory()}.
+ * @param needsStatistics whether this stage needs to collect
key statistics, i.e.,
Review Comment:
fixed, this was an outdated javadoc.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java:
##########
@@ -19,349 +19,285 @@
package org.apache.druid.msq.exec;
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
-import com.google.inject.Injector;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
-import org.apache.druid.frame.processor.Bouncer;
-import org.apache.druid.indexing.worker.config.WorkerConfig;
-import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.SuperSorter;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
-import org.apache.druid.msq.input.InputSpecs;
-import org.apache.druid.msq.kernel.QueryDefinition;
+import
org.apache.druid.msq.indexing.processor.KeyStatisticsCollectionProcessor;
+import
org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSlices;
+import org.apache.druid.msq.input.stage.ReadablePartition;
+import org.apache.druid.msq.input.stage.StageInputSlice;
+import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec;
+import org.apache.druid.msq.kernel.ShuffleSpec;
import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.querykit.BroadcastJoinSegmentMapFnProcessor;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
-import org.apache.druid.query.lookup.LookupExtractor;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
-import
org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import javax.annotation.Nullable;
+import java.util.List;
import java.util.Objects;
/**
- * Class for determining how much JVM heap to allocate to various purposes.
+ * Class for determining how much JVM heap to allocate to various purposes for
executing a {@link WorkOrder}.
*
- * First, we take a chunk out of the total JVM heap that is dedicated for MSQ;
see {@link #computeUsableMemoryInJvm}.
+ * First, we split each worker's memory allotment, given by {@link
MemoryIntrospector#memoryPerTask()}, into
+ * equally-sized "bundles" for each {@link WorkOrder} that may be running
simultaneously within the {@link Worker}
+ * for that {@link WorkOrder}.
*
- * Then, we carve out some space for each worker that may be running in our
JVM; see {@link #memoryPerWorker}.
+ * Within each bundle, we carve out memory required for buffering broadcast
data
+ * (see {@link #computeBroadcastBufferMemory}) and for concurrently-running
processors
+ * (see {@link #computeProcessorMemory}).
*
- * Then, we split the rest into "bundles" of equal size; see {@link
#memoryPerBundle}. The number of bundles is based
- * entirely on server configuration; this makes the calculation robust to
different queries running simultaneously in
- * the same JVM.
- *
- * Within each bundle, we split up memory in two different ways: one assuming
it'll be used for a
- * {@link org.apache.druid.frame.processor.SuperSorter}, and one assuming
it'll be used for a regular
- * processor. Callers can then use whichever set of allocations makes sense.
(We assume no single bundle
- * will be used for both purposes.)
+ * The remainder is called "bundle free memory", a pool of memory that can be
used for {@link SuperSorter} or
+ * {@link SegmentGeneratorFrameProcessorFactory}. The amounts overlap, because
the same {@link WorkOrder} never
+ * does both.
*/
public class WorkerMemoryParameters
{
- private static final Logger log = new Logger(WorkerMemoryParameters.class);
-
- /**
- * Percent of memory that we allocate to bundles. It is less than 100%
because we need to leave some space
- * left over for miscellaneous other stuff, and to ensure that GC pressure
does not get too high.
- */
- static final double USABLE_MEMORY_FRACTION = 0.75;
-
- /**
- * Percent of each bundle's memory that we allocate to appenderators. It is
less than 100% because appenderators
- * unfortunately have a variety of unaccounted-for memory usage.
- */
- static final double APPENDERATOR_MEMORY_FRACTION = 0.67;
-
- /**
- * Size for "standard frames", which are used for most purposes, except
inputs to super-sorters.
- *
- * In particular, frames that travel between workers are always the minimum
size. This is helpful because it makes
- * it easier to compute the amount of memory needed to merge input streams.
- */
- private static final int STANDARD_FRAME_SIZE = 1_000_000;
-
- /**
- * Size for "large frames", which are used for inputs and inner channels in
to super-sorters.
- *
- * This is helpful because it minimizes the number of temporary files needed
during super-sorting.
- */
- private static final int LARGE_FRAME_SIZE = 8_000_000;
-
/**
- * Minimum amount of bundle memory available for processing (i.e., total
bundle size minus the amount
- * needed for input channels). This memory is guaranteed to be available for
things like segment generation
- * and broadcast data.
+ * Default size for frames.
*/
- public static final long PROCESSING_MINIMUM_BYTES = 25_000_000;
+ public static final int DEFAULT_FRAME_SIZE = 1_000_000;
/**
- * Maximum amount of parallelism for the super-sorter. Higher amounts of
concurrency tend to be wasteful.
+ * Amount of extra memory available for each processing thread, beyond what
is needed for input and output
+ * channels. This memory is used for miscellaneous purposes within the
various {@link FrameProcessor}.
*/
- private static final int MAX_SUPER_SORTER_PROCESSORS = 4;
+ private static final long EXTRA_MEMORY_PER_PROCESSOR = 25_000_000;
/**
- * Each super-sorter must have at least 1 processor with 2 input frames and
1 output frame. That's 3 total.
+ * Percent of each bundle's free memory that we allocate to appenderators.
It is less than 100% because appenderators
+ * unfortunately have a variety of unaccounted-for memory usage.
*/
- private static final int MIN_SUPER_SORTER_FRAMES = 3;
+ private static final double APPENDERATOR_BUNDLE_FREE_MEMORY_FRACTION = 0.67;
/**
* (Very) rough estimate of the on-heap overhead of reading a column.
*/
private static final int APPENDERATOR_MERGE_ROUGH_MEMORY_PER_COLUMN = 3_000;
/**
- * Maximum percent of *total* available memory (not each bundle), i.e.
{@link #USABLE_MEMORY_FRACTION}, that we'll
- * ever use for maxRetainedBytes of {@link ClusterByStatisticsCollectorImpl}
across all workers.
+ * Maximum percent of each bundle's free memory that will be used for
maxRetainedBytes of
+ * {@link ClusterByStatisticsCollectorImpl}.
*/
- private static final double PARTITION_STATS_MEMORY_MAX_FRACTION = 0.1;
+ private static final double PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION
= 0.1;
/**
- * Maximum number of bytes we'll ever use for maxRetainedBytes of {@link
ClusterByStatisticsCollectorImpl} for
- * a single worker. Acts as a limit on the value computed based on {@link
#PARTITION_STATS_MEMORY_MAX_FRACTION}.
+ * Maximum number of bytes from each bundle's free memory that we'll ever
use for maxRetainedBytes of
+ * {@link ClusterByStatisticsCollectorImpl}. Limits the value computed based
on
+ * {@link #PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION}.
*/
- private static final long PARTITION_STATS_MEMORY_MAX_BYTES = 300_000_000;
+ private static final long PARTITION_STATS_MAX_MEMORY_PER_BUNDLE =
300_000_000;
/**
- * Threshold in bytes below which we assume that the worker is "small".
While calculating the memory requirements for
- * a small worker, we try to be as conservatives with the estimates and the
extra temporary space required by the
- * frames, since that can add up quickly and cause OOM.
+ * Minimum number of bytes from each bundle's free memory that we'll use for
maxRetainedBytes of
+ * {@link ClusterByStatisticsCollectorImpl}.
*/
- private static final long SMALL_WORKER_CAPACITY_THRESHOLD_BYTES =
256_000_000;
+ private static final long PARTITION_STATS_MIN_MEMORY_PER_BUNDLE = 10_000_000;
/**
- * Fraction of free memory per bundle that can be used by {@link
BroadcastJoinSegmentMapFnProcessor} to store broadcast
- * data on-heap. This is used to limit the total size of input frames, which
we expect to expand on-heap. Expansion
- * can potentially be somewhat over 2x: for example, strings are UTF-8 in
frames, but are UTF-16 on-heap, which is
- * a 2x expansion, and object and index overhead must be considered on top
of that. So we use a value somewhat
- * lower than 0.5.
+ * Fraction of each bundle's total memory that can be used to buffer
broadcast inputs. This is used by
+ * {@link BroadcastJoinSegmentMapFnProcessor} to limit how much joinable
data is stored on-heap. This is carved
+ * directly out of the total bundle memory, which makes its size more
predictable and stable: it only depends on
+ * the total JVM memory, the number of tasks per JVM, and the value of
maxConcurrentStages for the query. This
+ * stability is important, because if the broadcast buffer fills up, the
query fails. So any time its size changes,
+ * we risk queries failing that would formerly have succeeded.
*/
- static final double BROADCAST_JOIN_MEMORY_FRACTION = 0.3;
+ private static final double BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION = 0.2;
/**
- * Fraction of free memory per bundle that can be used by
- * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor}
to buffer frames in its trackers.
+ * Multiplier to apply to {@link #BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION}
when determining how much free bundle
+ * memory is left over. This fudge factor exists because {@link
BroadcastJoinSegmentMapFnProcessor} applies data
+ * size limits based on frame size, which we expect to expand somewhat in
memory due to indexing structures in
+ * {@link org.apache.druid.segment.join.table.FrameBasedIndexedTable}.
*/
- static final double SORT_MERGE_JOIN_MEMORY_FRACTION = 0.9;
+ private static final double BROADCAST_BUFFER_OVERHEAD_RATIO = 1.5;
/**
- * In case {@link NotEnoughMemoryFault} is thrown, a fixed estimation
overhead is added when estimating total memory required for the process.
+ * Amount of memory that can be used by
+ * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor}
to buffer frames in its trackers.
*/
- private static final long BUFFER_BYTES_FOR_ESTIMATION = 1000;
+ private static final long SORT_MERGE_JOIN_MEMORY_PER_PROCESSOR = (long)
(EXTRA_MEMORY_PER_PROCESSOR * 0.9);
- private final long processorBundleMemory;
- private final int superSorterMaxActiveProcessors;
- private final int superSorterMaxChannelsPerProcessor;
+ private final long bundleFreeMemory;
+ private final int frameSize;
+ private final int superSorterConcurrentProcessors;
+ private final int superSorterMaxChannelsPerMerger;
private final int partitionStatisticsMaxRetainedBytes;
-
- WorkerMemoryParameters(
- final long processorBundleMemory,
- final int superSorterMaxActiveProcessors,
- final int superSorterMaxChannelsPerProcessor,
- final int partitionStatisticsMaxRetainedBytes
+ private final long broadcastBufferMemory;
+
+ public WorkerMemoryParameters(
+ final long bundleFreeMemory,
+ final int frameSize,
+ final int superSorterConcurrentProcessors,
+ final int superSorterMaxChannelsPerMerger,
+ final int partitionStatisticsMaxRetainedBytes,
+ final long broadcastBufferMemory
)
{
- this.processorBundleMemory = processorBundleMemory;
- this.superSorterMaxActiveProcessors = superSorterMaxActiveProcessors;
- this.superSorterMaxChannelsPerProcessor =
superSorterMaxChannelsPerProcessor;
+ this.bundleFreeMemory = bundleFreeMemory;
+ this.frameSize = frameSize;
+ this.superSorterConcurrentProcessors = superSorterConcurrentProcessors;
+ this.superSorterMaxChannelsPerMerger = superSorterMaxChannelsPerMerger;
this.partitionStatisticsMaxRetainedBytes =
partitionStatisticsMaxRetainedBytes;
+ this.broadcastBufferMemory = broadcastBufferMemory;
}
/**
- * Create a production instance for {@link
org.apache.druid.msq.indexing.MSQWorkerTask}.
+ * Create a production instance for a given {@link WorkOrder}.
*/
- public static WorkerMemoryParameters createProductionInstanceForWorker(
- final Injector injector,
- final QueryDefinition queryDef,
- final int stageNumber,
+ public static WorkerMemoryParameters createProductionInstance(
+ final WorkOrder workOrder,
+ final MemoryIntrospector memoryIntrospector,
final int maxConcurrentStages
)
{
- final StageDefinition stageDef = queryDef.getStageDefinition(stageNumber);
- final IntSet inputStageNumbers =
InputSpecs.getStageNumbers(stageDef.getInputSpecs());
- final int numInputWorkers =
- inputStageNumbers.intStream()
- .map(inputStageNumber ->
queryDef.getStageDefinition(inputStageNumber).getMaxWorkerCount())
- .sum();
- long totalLookupFootprint = computeTotalLookupFootprint(injector);
-
- final int numHashOutputPartitions;
- if (stageDef.doesShuffle() && stageDef.getShuffleSpec().kind().isHash()) {
- numHashOutputPartitions = stageDef.getShuffleSpec().partitionCount();
- } else {
- numHashOutputPartitions = 0;
- }
-
+ final StageDefinition stageDef = workOrder.getStageDefinition();
return createInstance(
- Runtime.getRuntime().maxMemory(),
- computeNumWorkersInJvm(injector),
- computeNumProcessorsInJvm(injector),
+ memoryIntrospector,
+ DEFAULT_FRAME_SIZE,
+ workOrder.getInputs(),
+ stageDef.getBroadcastInputNumbers(),
+ stageDef.doesShuffle() ? stageDef.getShuffleSpec() : null,
maxConcurrentStages,
- numInputWorkers,
- numHashOutputPartitions,
- totalLookupFootprint
+ computeFramesPerOutputChannel(workOrder.getOutputChannelMode())
);
}
/**
- * Returns an object specifying memory-usage parameters.
+ * Returns an object specifying memory-usage parameters for a {@link
WorkOrder} running inside a {@link Worker}.
*
* Throws a {@link MSQException} with an appropriate fault if the provided
combination of parameters cannot
* yield a workable memory situation.
*
- * @param maxMemoryInJvm memory available in the entire JVM. This
will be divided amongst processors.
- * @param numWorkersInJvm number of workers that can run
concurrently in this JVM. Generally equal to
- * the task capacity.
- * @param numProcessingThreadsInJvm size of the processing thread pool in
the JVM.
- * @param maxConcurrentStages maximum number of concurrent stages per
worker.
- * @param numInputWorkers total number of workers across all input
stages.
- * @param numHashOutputPartitions total number of output partitions, if
using hash partitioning; zero if not using
- * hash partitioning.
- * @param totalLookupFootprint estimated size of the lookups loaded by
the process.
+ * @param memoryIntrospector memory introspector
+ * @param frameSize frame size
+ * @param hasBroadcastInputs whether this stage has broadcast
inputs. If so, we allocate some memory
+ * for {@link
#getBroadcastBufferMemory()}.
+ * @param needsStatistics whether this stage needs to collect
key statistics, i.e.,
+ * {@link
StageDefinition#mustGatherResultKeyStatistics()}
+ * @param inputWorkers figure from {@link
#computeNumInputWorkers}
Review Comment:
fixed, this was an outdated javadoc.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java:
##########
@@ -19,349 +19,285 @@
package org.apache.druid.msq.exec;
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
-import com.google.inject.Injector;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
-import org.apache.druid.frame.processor.Bouncer;
-import org.apache.druid.indexing.worker.config.WorkerConfig;
-import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.SuperSorter;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
-import org.apache.druid.msq.input.InputSpecs;
-import org.apache.druid.msq.kernel.QueryDefinition;
+import
org.apache.druid.msq.indexing.processor.KeyStatisticsCollectionProcessor;
+import
org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSlices;
+import org.apache.druid.msq.input.stage.ReadablePartition;
+import org.apache.druid.msq.input.stage.StageInputSlice;
+import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec;
+import org.apache.druid.msq.kernel.ShuffleSpec;
import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.querykit.BroadcastJoinSegmentMapFnProcessor;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
-import org.apache.druid.query.lookup.LookupExtractor;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
-import
org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import javax.annotation.Nullable;
+import java.util.List;
import java.util.Objects;
/**
- * Class for determining how much JVM heap to allocate to various purposes.
+ * Class for determining how much JVM heap to allocate to various purposes for
executing a {@link WorkOrder}.
*
- * First, we take a chunk out of the total JVM heap that is dedicated for MSQ;
see {@link #computeUsableMemoryInJvm}.
+ * First, we split each worker's memory allotment, given by {@link
MemoryIntrospector#memoryPerTask()}, into
+ * equally-sized "bundles" for each {@link WorkOrder} that may be running
simultaneously within the {@link Worker}
+ * for that {@link WorkOrder}.
*
- * Then, we carve out some space for each worker that may be running in our
JVM; see {@link #memoryPerWorker}.
+ * Within each bundle, we carve out memory required for buffering broadcast
data
+ * (see {@link #computeBroadcastBufferMemory}) and for concurrently-running
processors
+ * (see {@link #computeProcessorMemory}).
*
- * Then, we split the rest into "bundles" of equal size; see {@link
#memoryPerBundle}. The number of bundles is based
- * entirely on server configuration; this makes the calculation robust to
different queries running simultaneously in
- * the same JVM.
- *
- * Within each bundle, we split up memory in two different ways: one assuming
it'll be used for a
- * {@link org.apache.druid.frame.processor.SuperSorter}, and one assuming
it'll be used for a regular
- * processor. Callers can then use whichever set of allocations makes sense.
(We assume no single bundle
- * will be used for both purposes.)
+ * The remainder is called "bundle free memory", a pool of memory that can be
used for {@link SuperSorter} or
+ * {@link SegmentGeneratorFrameProcessorFactory}. The amounts overlap, because
the same {@link WorkOrder} never
+ * does both.
*/
public class WorkerMemoryParameters
{
- private static final Logger log = new Logger(WorkerMemoryParameters.class);
-
- /**
- * Percent of memory that we allocate to bundles. It is less than 100%
because we need to leave some space
- * left over for miscellaneous other stuff, and to ensure that GC pressure
does not get too high.
- */
- static final double USABLE_MEMORY_FRACTION = 0.75;
-
- /**
- * Percent of each bundle's memory that we allocate to appenderators. It is
less than 100% because appenderators
- * unfortunately have a variety of unaccounted-for memory usage.
- */
- static final double APPENDERATOR_MEMORY_FRACTION = 0.67;
-
- /**
- * Size for "standard frames", which are used for most purposes, except
inputs to super-sorters.
- *
- * In particular, frames that travel between workers are always the minimum
size. This is helpful because it makes
- * it easier to compute the amount of memory needed to merge input streams.
- */
- private static final int STANDARD_FRAME_SIZE = 1_000_000;
-
- /**
- * Size for "large frames", which are used for inputs and inner channels in
to super-sorters.
- *
- * This is helpful because it minimizes the number of temporary files needed
during super-sorting.
- */
- private static final int LARGE_FRAME_SIZE = 8_000_000;
-
/**
- * Minimum amount of bundle memory available for processing (i.e., total
bundle size minus the amount
- * needed for input channels). This memory is guaranteed to be available for
things like segment generation
- * and broadcast data.
+ * Default size for frames.
*/
- public static final long PROCESSING_MINIMUM_BYTES = 25_000_000;
+ public static final int DEFAULT_FRAME_SIZE = 1_000_000;
/**
- * Maximum amount of parallelism for the super-sorter. Higher amounts of
concurrency tend to be wasteful.
+ * Amount of extra memory available for each processing thread, beyond what
is needed for input and output
+ * channels. This memory is used for miscellaneous purposes within the
various {@link FrameProcessor}.
*/
- private static final int MAX_SUPER_SORTER_PROCESSORS = 4;
+ private static final long EXTRA_MEMORY_PER_PROCESSOR = 25_000_000;
/**
- * Each super-sorter must have at least 1 processor with 2 input frames and
1 output frame. That's 3 total.
+ * Percent of each bundle's free memory that we allocate to appenderators.
It is less than 100% because appenderators
+ * unfortunately have a variety of unaccounted-for memory usage.
*/
- private static final int MIN_SUPER_SORTER_FRAMES = 3;
+ private static final double APPENDERATOR_BUNDLE_FREE_MEMORY_FRACTION = 0.67;
/**
* (Very) rough estimate of the on-heap overhead of reading a column.
*/
private static final int APPENDERATOR_MERGE_ROUGH_MEMORY_PER_COLUMN = 3_000;
/**
- * Maximum percent of *total* available memory (not each bundle), i.e.
{@link #USABLE_MEMORY_FRACTION}, that we'll
- * ever use for maxRetainedBytes of {@link ClusterByStatisticsCollectorImpl}
across all workers.
+ * Maximum percent of each bundle's free memory that will be used for
maxRetainedBytes of
+ * {@link ClusterByStatisticsCollectorImpl}.
*/
- private static final double PARTITION_STATS_MEMORY_MAX_FRACTION = 0.1;
+ private static final double PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION
= 0.1;
/**
- * Maximum number of bytes we'll ever use for maxRetainedBytes of {@link
ClusterByStatisticsCollectorImpl} for
- * a single worker. Acts as a limit on the value computed based on {@link
#PARTITION_STATS_MEMORY_MAX_FRACTION}.
+ * Maximum number of bytes from each bundle's free memory that we'll ever
use for maxRetainedBytes of
+ * {@link ClusterByStatisticsCollectorImpl}. Limits the value computed based
on
+ * {@link #PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION}.
*/
- private static final long PARTITION_STATS_MEMORY_MAX_BYTES = 300_000_000;
+ private static final long PARTITION_STATS_MAX_MEMORY_PER_BUNDLE =
300_000_000;
/**
- * Threshold in bytes below which we assume that the worker is "small".
While calculating the memory requirements for
- * a small worker, we try to be as conservatives with the estimates and the
extra temporary space required by the
- * frames, since that can add up quickly and cause OOM.
+ * Minimum number of bytes from each bundle's free memory that we'll use for
maxRetainedBytes of
+ * {@link ClusterByStatisticsCollectorImpl}.
*/
- private static final long SMALL_WORKER_CAPACITY_THRESHOLD_BYTES =
256_000_000;
+ private static final long PARTITION_STATS_MIN_MEMORY_PER_BUNDLE = 10_000_000;
/**
- * Fraction of free memory per bundle that can be used by {@link
BroadcastJoinSegmentMapFnProcessor} to store broadcast
- * data on-heap. This is used to limit the total size of input frames, which
we expect to expand on-heap. Expansion
- * can potentially be somewhat over 2x: for example, strings are UTF-8 in
frames, but are UTF-16 on-heap, which is
- * a 2x expansion, and object and index overhead must be considered on top
of that. So we use a value somewhat
- * lower than 0.5.
+ * Fraction of each bundle's total memory that can be used to buffer
broadcast inputs. This is used by
+ * {@link BroadcastJoinSegmentMapFnProcessor} to limit how much joinable
data is stored on-heap. This is carved
+ * directly out of the total bundle memory, which makes its size more
predictable and stable: it only depends on
+ * the total JVM memory, the number of tasks per JVM, and the value of
maxConcurrentStages for the query. This
+ * stability is important, because if the broadcast buffer fills up, the
query fails. So any time its size changes,
+ * we risk queries failing that would formerly have succeeded.
*/
- static final double BROADCAST_JOIN_MEMORY_FRACTION = 0.3;
+ private static final double BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION = 0.2;
/**
- * Fraction of free memory per bundle that can be used by
- * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor}
to buffer frames in its trackers.
+ * Multiplier to apply to {@link #BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION}
when determining how much free bundle
+ * memory is left over. This fudge factor exists because {@link
BroadcastJoinSegmentMapFnProcessor} applies data
+ * size limits based on frame size, which we expect to expand somewhat in
memory due to indexing structures in
+ * {@link org.apache.druid.segment.join.table.FrameBasedIndexedTable}.
*/
- static final double SORT_MERGE_JOIN_MEMORY_FRACTION = 0.9;
+ private static final double BROADCAST_BUFFER_OVERHEAD_RATIO = 1.5;
/**
- * In case {@link NotEnoughMemoryFault} is thrown, a fixed estimation
overhead is added when estimating total memory required for the process.
+ * Amount of memory that can be used by
+ * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor}
to buffer frames in its trackers.
*/
- private static final long BUFFER_BYTES_FOR_ESTIMATION = 1000;
+ private static final long SORT_MERGE_JOIN_MEMORY_PER_PROCESSOR = (long)
(EXTRA_MEMORY_PER_PROCESSOR * 0.9);
- private final long processorBundleMemory;
- private final int superSorterMaxActiveProcessors;
- private final int superSorterMaxChannelsPerProcessor;
+ private final long bundleFreeMemory;
+ private final int frameSize;
+ private final int superSorterConcurrentProcessors;
+ private final int superSorterMaxChannelsPerMerger;
private final int partitionStatisticsMaxRetainedBytes;
-
- WorkerMemoryParameters(
- final long processorBundleMemory,
- final int superSorterMaxActiveProcessors,
- final int superSorterMaxChannelsPerProcessor,
- final int partitionStatisticsMaxRetainedBytes
+ private final long broadcastBufferMemory;
+
+ public WorkerMemoryParameters(
+ final long bundleFreeMemory,
+ final int frameSize,
+ final int superSorterConcurrentProcessors,
+ final int superSorterMaxChannelsPerMerger,
+ final int partitionStatisticsMaxRetainedBytes,
+ final long broadcastBufferMemory
)
{
- this.processorBundleMemory = processorBundleMemory;
- this.superSorterMaxActiveProcessors = superSorterMaxActiveProcessors;
- this.superSorterMaxChannelsPerProcessor =
superSorterMaxChannelsPerProcessor;
+ this.bundleFreeMemory = bundleFreeMemory;
+ this.frameSize = frameSize;
+ this.superSorterConcurrentProcessors = superSorterConcurrentProcessors;
+ this.superSorterMaxChannelsPerMerger = superSorterMaxChannelsPerMerger;
this.partitionStatisticsMaxRetainedBytes =
partitionStatisticsMaxRetainedBytes;
+ this.broadcastBufferMemory = broadcastBufferMemory;
}
/**
- * Create a production instance for {@link
org.apache.druid.msq.indexing.MSQWorkerTask}.
+ * Create a production instance for a given {@link WorkOrder}.
*/
- public static WorkerMemoryParameters createProductionInstanceForWorker(
- final Injector injector,
- final QueryDefinition queryDef,
- final int stageNumber,
+ public static WorkerMemoryParameters createProductionInstance(
+ final WorkOrder workOrder,
+ final MemoryIntrospector memoryIntrospector,
final int maxConcurrentStages
)
{
- final StageDefinition stageDef = queryDef.getStageDefinition(stageNumber);
- final IntSet inputStageNumbers =
InputSpecs.getStageNumbers(stageDef.getInputSpecs());
- final int numInputWorkers =
- inputStageNumbers.intStream()
- .map(inputStageNumber ->
queryDef.getStageDefinition(inputStageNumber).getMaxWorkerCount())
- .sum();
- long totalLookupFootprint = computeTotalLookupFootprint(injector);
-
- final int numHashOutputPartitions;
- if (stageDef.doesShuffle() && stageDef.getShuffleSpec().kind().isHash()) {
- numHashOutputPartitions = stageDef.getShuffleSpec().partitionCount();
- } else {
- numHashOutputPartitions = 0;
- }
-
+ final StageDefinition stageDef = workOrder.getStageDefinition();
return createInstance(
- Runtime.getRuntime().maxMemory(),
- computeNumWorkersInJvm(injector),
- computeNumProcessorsInJvm(injector),
+ memoryIntrospector,
+ DEFAULT_FRAME_SIZE,
+ workOrder.getInputs(),
+ stageDef.getBroadcastInputNumbers(),
+ stageDef.doesShuffle() ? stageDef.getShuffleSpec() : null,
maxConcurrentStages,
- numInputWorkers,
- numHashOutputPartitions,
- totalLookupFootprint
+ computeFramesPerOutputChannel(workOrder.getOutputChannelMode())
);
}
/**
- * Returns an object specifying memory-usage parameters.
+ * Returns an object specifying memory-usage parameters for a {@link
WorkOrder} running inside a {@link Worker}.
*
* Throws a {@link MSQException} with an appropriate fault if the provided
combination of parameters cannot
* yield a workable memory situation.
*
- * @param maxMemoryInJvm memory available in the entire JVM. This
will be divided amongst processors.
- * @param numWorkersInJvm number of workers that can run
concurrently in this JVM. Generally equal to
- * the task capacity.
- * @param numProcessingThreadsInJvm size of the processing thread pool in
the JVM.
- * @param maxConcurrentStages maximum number of concurrent stages per
worker.
- * @param numInputWorkers total number of workers across all input
stages.
- * @param numHashOutputPartitions total number of output partitions, if
using hash partitioning; zero if not using
- * hash partitioning.
- * @param totalLookupFootprint estimated size of the lookups loaded by
the process.
+ * @param memoryIntrospector memory introspector
+ * @param frameSize frame size
+ * @param hasBroadcastInputs whether this stage has broadcast
inputs. If so, we allocate some memory
+ * for {@link
#getBroadcastBufferMemory()}.
+ * @param needsStatistics whether this stage needs to collect
key statistics, i.e.,
+ * {@link
StageDefinition#mustGatherResultKeyStatistics()}
+ * @param inputWorkers figure from {@link
#computeNumInputWorkers}
+ * @param maxInputChannelsPerProcessor figure from {@link
#computeMaxSimultaneousInputChannelsPerProcessor}
Review Comment:
fixed, this was an outdated javadoc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]