This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9423aa9163 MSQ: Consider PARTITION_STATS_MAX_BYTES in
WorkerMemoryParameters. (#13274)
9423aa9163 is described below
commit 9423aa9163ce60f9572e9042c0dcb370fe81770f
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Nov 7 00:57:18 2022 -0800
MSQ: Consider PARTITION_STATS_MAX_BYTES in WorkerMemoryParameters. (#13274)
* MSQ: Consider PARTITION_STATS_MAX_BYTES in WorkerMemoryParameters.
This consideration is important, because otherwise we can run out of
memory due to large statistics-tracking objects.
* Improved calculations.
---
docs/multi-stage-query/concepts.md | 4 +-
docs/multi-stage-query/reference.md | 2 +-
.../org/apache/druid/msq/exec/ControllerImpl.java | 6 +-
.../java/org/apache/druid/msq/exec/WorkerImpl.java | 6 +-
.../druid/msq/exec/WorkerMemoryParameters.java | 230 +++++++++++++++++++--
.../druid/msq/indexing/IndexerWorkerContext.java | 85 +-------
.../msq/indexing/error/NotEnoughMemoryFault.java | 16 +-
.../apache/druid/msq/kernel/StageDefinition.java | 7 +-
.../kernel/controller/ControllerQueryKernel.java | 7 +-
.../kernel/controller/ControllerStageTracker.java | 23 ++-
.../druid/msq/exec/WorkerMemoryParametersTest.java | 57 +++--
.../msq/indexing/IndexerWorkerContextTest.java | 3 +-
.../msq/indexing/error/MSQFaultSerdeTest.java | 2 +-
.../controller/BaseControllerQueryKernelTest.java | 10 +-
.../org/apache/druid/msq/test/MSQTestBase.java | 31 ++-
.../druid/msq/test/MSQTestWorkerContext.java | 3 +-
16 files changed, 326 insertions(+), 166 deletions(-)
diff --git a/docs/multi-stage-query/concepts.md
b/docs/multi-stage-query/concepts.md
index 0e8dd52312..ae64478243 100644
--- a/docs/multi-stage-query/concepts.md
+++ b/docs/multi-stage-query/concepts.md
@@ -257,8 +257,8 @@ On Peons launched by Middle Managers, the bulk of the JVM
heap (75%, less any sp
[lookups](../querying/lookups.md)) is split up into two bundles of equal size:
one processor bundle and one worker
bundle. Each one comprises 37.5% of the available JVM heap, less any space
used by [lookups](../querying/lookups.md).
-Depending on the type of query, each worker and controller task can use a
sketch for generating partition boundaries.
-Each sketch uses at most approximately 300 MB.
+Depending on the type of query, controller and worker tasks may use sketches
for determining partition boundaries.
+The heap footprint of these sketches is capped at 10% of available memory, or
300 MB, whichever is lower.
The processor memory bundle is used for query processing and segment
generation. Each processor bundle must also
provides space to buffer I/O between stages. Specifically, each downstream
stage requires 1 MB of buffer space for each
diff --git a/docs/multi-stage-query/reference.md
b/docs/multi-stage-query/reference.md
index a50e7991dc..107631595b 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -251,7 +251,7 @@ The following table describes error codes you may encounter
in the `multiStageQu
| TooManyColumns | Exceeded the number of columns for a stage. See the
[Limits](#limits) table for the specific limit. | `maxColumns`: The limit on
columns which was exceeded. |
| TooManyWarnings | Exceeded the allowed number of warnings of a particular
type. | `rootErrorCode`: The error code corresponding to the exception that
exceeded the required limit. <br /><br />`maxWarnings`: Maximum number of
warnings that are allowed for the corresponding `rootErrorCode`. |
| TooManyWorkers | Exceeded the supported number of workers running
simultaneously. See the [Limits](#limits) table for the specific limit. |
`workers`: The number of simultaneously running workers that exceeded a hard or
soft limit. This may be larger than the number of workers in any one stage if
multiple stages are running simultaneously. <br /><br />`maxWorkers`: The hard
or soft limit on workers that was exceeded. |
-| NotEnoughMemory | Insufficient memory to launch a stage. |
`serverMemory`: The amount of memory available to a single process.<br /><br
/>`serverWorkers`: The number of workers running in a single process.<br /><br
/>`serverThreads`: The number of threads in a single process. |
+| NotEnoughMemory | Insufficient memory to launch a stage. |
`serverMemory`: The amount of memory available to a single process.<br /><br
/>`usableMemory`: The amount of server memory usable by query-related work.
Excludes space taken by lookups plus an additional 25% overhead.<br /><br
/>`serverWorkers`: The number of workers running in a single process.<br /><br
/>`serverThreads`: The number of threads in a single process. |
| WorkerFailed | A worker task failed unexpectedly. | `workerTaskId`: The
ID of the worker task. |
| WorkerRpcFailed | A remote procedure call to a worker task failed and
could not recover. | `workerTaskId`: the id of the worker task |
| UnknownError | All other errors. | |
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 8eb348bf50..647a756757 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -1942,7 +1942,11 @@ public class ControllerImpl implements Controller
this.queryDef = queryDef;
this.inputSpecSlicerFactory = inputSpecSlicerFactory;
this.closer = closer;
- this.queryKernel = new ControllerQueryKernel(queryDef);
+ this.queryKernel = new ControllerQueryKernel(
+ queryDef,
+
WorkerMemoryParameters.createProductionInstanceForController(context.injector())
+ .getPartitionStatisticsMaxRetainedBytes()
+ );
}
/**
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index ad1cdfa04c..5b68041d0e 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -1062,6 +1062,7 @@ public class WorkerImpl implements Worker
clusterBy,
processorOutputChannels,
exec,
+ memoryParameters.getPartitionStatisticsMaxRetainedBytes(),
cancellationId,
kernelManipulationQueue
);
@@ -1102,6 +1103,7 @@ public class WorkerImpl implements Worker
final ClusterBy clusterBy,
final OutputChannels processorOutputChannels,
final FrameProcessorExecutor exec,
+ final int partitionStatisticsMaxRetainedBytes,
final String cancellationId,
final BlockingQueue<Consumer<KernelHolder>> kernelManipulationQueue
)
@@ -1119,7 +1121,7 @@ public class WorkerImpl implements Worker
channel.writable(),
stageDefinition.getFrameReader(),
clusterBy,
- stageDefinition.createResultKeyStatisticsCollector()
+
stageDefinition.createResultKeyStatisticsCollector(partitionStatisticsMaxRetainedBytes)
)
);
}
@@ -1127,7 +1129,7 @@ public class WorkerImpl implements Worker
final ListenableFuture<ClusterByStatisticsCollector>
clusterByStatisticsCollectorFuture =
exec.runAllFully(
Sequences.simple(processors),
- stageDefinition.createResultKeyStatisticsCollector(),
+
stageDefinition.createResultKeyStatisticsCollector(partitionStatisticsMaxRetainedBytes),
ClusterByStatisticsCollector::addAll,
// Run all processors simultaneously. They are lightweight and
this keeps things moving.
processors.size(),
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
index 9f00331191..fe812e2a29 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
@@ -20,18 +20,35 @@
package org.apache.druid.msq.exec;
import com.google.common.primitives.Ints;
+import com.google.inject.Injector;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.processor.Bouncer;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
+import org.apache.druid.msq.input.InputSpecs;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
+import org.apache.druid.query.lookup.LookupExtractor;
+import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
+import org.apache.druid.query.lookup.LookupReferencesManager;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import
org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
import java.util.Objects;
/**
* Class for determining how much JVM heap to allocate to various purposes.
*
- * First, we take {@link #USABLE_MEMORY_FRACTION} out of the total JVM heap
and split it into "bundles" of
- * equal size. The number of bundles is based entirely on server
configuration; this makes the calculation
- * robust to different queries running simultaneously in the same JVM.
+ * First, we take a chunk out of the total JVM heap that is dedicated for MSQ;
see {@link #computeUsableMemoryInJvm}.
+ *
+ * Then, we carve out some space for each worker that may be running in our
JVM; see {@link #memoryPerWorker}.
+ *
+ * Then, we split the rest into "bundles" of equal size; see {@link
#memoryPerBundle}. The number of bundles is based
+ * entirely on server configuration; this makes the calculation robust to
different queries running simultaneously in
+ * the same JVM.
*
* Then, we split up the resources for each bundle in two different ways: one
assuming it'll be used for a
* {@link org.apache.druid.frame.processor.SuperSorter}, and one assuming
it'll be used for a regular
@@ -40,11 +57,13 @@ import java.util.Objects;
*/
public class WorkerMemoryParameters
{
+ private static final Logger log = new Logger(WorkerMemoryParameters.class);
+
/**
* Percent of memory that we allocate to bundles. It is less than 100%
because we need to leave some space
* left over for miscellaneous other stuff, and to ensure that GC pressure
does not get too high.
*/
- private static final double USABLE_MEMORY_FRACTION = 0.75;
+ static final double USABLE_MEMORY_FRACTION = 0.75;
/**
* Percent of each bundle's memory that we allocate to appenderators. It is
less than 100% because appenderators
@@ -89,6 +108,18 @@ public class WorkerMemoryParameters
*/
private static final int APPENDERATOR_MERGE_ROUGH_MEMORY_PER_COLUMN = 3_000;
+ /**
+ * Maximum percent of *total* available memory (not each bundle), i.e.
{@link #USABLE_MEMORY_FRACTION}, that we'll
+ * ever use for maxRetainedBytes of {@link ClusterByStatisticsCollectorImpl}
across all workers.
+ */
+ private static final double PARTITION_STATS_MEMORY_MAX_FRACTION = 0.1;
+
+ /**
+ * Maximum number of bytes we'll ever use for maxRetainedBytes of {@link
ClusterByStatisticsCollectorImpl} for
+ * a single worker. Acts as a limit on the value computed based on {@link
#PARTITION_STATS_MEMORY_MAX_FRACTION}.
+ */
+ private static final long PARTITION_STATS_MEMORY_MAX_BYTES = 300_000_000;
+
/**
* Fraction of free memory per bundle that can be used by {@link
org.apache.druid.msq.querykit.BroadcastJoinHelper}
* to store broadcast data on-heap. This is used to limit the total size of
input frames, which we expect to
@@ -102,22 +133,64 @@ public class WorkerMemoryParameters
private final int superSorterMaxChannelsPerProcessor;
private final long appenderatorMemory;
private final long broadcastJoinMemory;
+ private final int partitionStatisticsMaxRetainedBytes;
WorkerMemoryParameters(
final int superSorterMaxActiveProcessors,
final int superSorterMaxChannelsPerProcessor,
final long appenderatorMemory,
- final long broadcastJoinMemory
+ final long broadcastJoinMemory,
+ final int partitionStatisticsMaxRetainedBytes
)
{
this.superSorterMaxActiveProcessors = superSorterMaxActiveProcessors;
this.superSorterMaxChannelsPerProcessor =
superSorterMaxChannelsPerProcessor;
this.appenderatorMemory = appenderatorMemory;
this.broadcastJoinMemory = broadcastJoinMemory;
+ this.partitionStatisticsMaxRetainedBytes =
partitionStatisticsMaxRetainedBytes;
}
/**
- * Returns an object specifying memory-usage parameters for a stage in a
worker.
+ * Create a production instance for {@link
org.apache.druid.msq.indexing.MSQControllerTask}.
+ */
+ public static WorkerMemoryParameters
createProductionInstanceForController(final Injector injector)
+ {
+ return createInstance(
+ Runtime.getRuntime().maxMemory(),
+ computeUsableMemoryInJvm(injector),
+ computeNumWorkersInJvm(injector),
+ computeNumProcessorsInJvm(injector),
+ 0
+ );
+ }
+
+ /**
+ * Create a production instance for {@link
org.apache.druid.msq.indexing.MSQWorkerTask}.
+ */
+ public static WorkerMemoryParameters createProductionInstanceForWorker(
+ final Injector injector,
+ final QueryDefinition queryDef,
+ final int stageNumber
+ )
+ {
+ final IntSet inputStageNumbers =
+
InputSpecs.getStageNumbers(queryDef.getStageDefinition(stageNumber).getInputSpecs());
+ final int numInputWorkers =
+ inputStageNumbers.intStream()
+ .map(inputStageNumber ->
queryDef.getStageDefinition(inputStageNumber).getMaxWorkerCount())
+ .sum();
+
+ return createInstance(
+ Runtime.getRuntime().maxMemory(),
+ computeUsableMemoryInJvm(injector),
+ computeNumWorkersInJvm(injector),
+ computeNumProcessorsInJvm(injector),
+ numInputWorkers
+ );
+ }
+
+ /**
+ * Returns an object specifying memory-usage parameters.
*
* Throws a {@link MSQException} with an appropriate fault if the provided
combination of parameters cannot
* yield a workable memory situation.
@@ -128,19 +201,21 @@ public class WorkerMemoryParameters
* @param numProcessingThreadsInJvm size of the processing thread pool in
the JVM.
* @param numInputWorkers number of workers across input stages
that need to be merged together.
*/
- public static WorkerMemoryParameters compute(
+ public static WorkerMemoryParameters createInstance(
final long maxMemoryInJvm,
+ final long usableMemoryInJvm,
final int numWorkersInJvm,
final int numProcessingThreadsInJvm,
final int numInputWorkers
)
{
- final long bundleMemory = memoryPerBundle(maxMemoryInJvm, numWorkersInJvm,
numProcessingThreadsInJvm);
+ final long workerMemory = memoryPerWorker(usableMemoryInJvm,
numWorkersInJvm);
+ final long bundleMemory = memoryPerBundle(usableMemoryInJvm,
numWorkersInJvm, numProcessingThreadsInJvm);
final long bundleMemoryForInputChannels =
memoryNeededForInputChannels(numInputWorkers);
final long bundleMemoryForProcessing = bundleMemory -
bundleMemoryForInputChannels;
if (bundleMemoryForProcessing < PROCESSING_MINIMUM_BYTES) {
- final int maxWorkers = computeMaxWorkers(maxMemoryInJvm,
numWorkersInJvm, numProcessingThreadsInJvm);
+ final int maxWorkers = computeMaxWorkers(usableMemoryInJvm,
numWorkersInJvm, numProcessingThreadsInJvm);
if (maxWorkers > 0) {
throw new MSQException(new TooManyWorkersFault(numInputWorkers,
Math.min(Limits.MAX_WORKERS, maxWorkers)));
@@ -149,6 +224,7 @@ public class WorkerMemoryParameters
throw new MSQException(
new NotEnoughMemoryFault(
maxMemoryInJvm,
+ usableMemoryInJvm,
numWorkersInJvm,
numProcessingThreadsInJvm
)
@@ -160,7 +236,14 @@ public class WorkerMemoryParameters
final int maxNumFramesForSuperSorter = Ints.checkedCast(bundleMemory /
WorkerMemoryParameters.LARGE_FRAME_SIZE);
if (maxNumFramesForSuperSorter < MIN_SUPER_SORTER_FRAMES) {
- throw new MSQException(new NotEnoughMemoryFault(maxMemoryInJvm,
numWorkersInJvm, numProcessingThreadsInJvm));
+ throw new MSQException(
+ new NotEnoughMemoryFault(
+ maxMemoryInJvm,
+ usableMemoryInJvm,
+ numWorkersInJvm,
+ numProcessingThreadsInJvm
+ )
+ );
}
final int superSorterMaxActiveProcessors = Math.min(
@@ -178,7 +261,8 @@ public class WorkerMemoryParameters
superSorterMaxActiveProcessors,
superSorterMaxChannelsPerProcessor,
(long) (bundleMemoryForProcessing * APPENDERATOR_MEMORY_FRACTION),
- (long) (bundleMemoryForProcessing * BROADCAST_JOIN_MEMORY_FRACTION)
+ (long) (bundleMemoryForProcessing * BROADCAST_JOIN_MEMORY_FRACTION),
+ Ints.checkedCast(workerMemory) // 100% of worker memory is devoted to
partition statistics
);
}
@@ -219,6 +303,11 @@ public class WorkerMemoryParameters
return broadcastJoinMemory;
}
+ public int getPartitionStatisticsMaxRetainedBytes()
+ {
+ return partitionStatisticsMaxRetainedBytes;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -232,7 +321,8 @@ public class WorkerMemoryParameters
return superSorterMaxActiveProcessors ==
that.superSorterMaxActiveProcessors
&& superSorterMaxChannelsPerProcessor ==
that.superSorterMaxChannelsPerProcessor
&& appenderatorMemory == that.appenderatorMemory
- && broadcastJoinMemory == that.broadcastJoinMemory;
+ && broadcastJoinMemory == that.broadcastJoinMemory
+ && partitionStatisticsMaxRetainedBytes ==
that.partitionStatisticsMaxRetainedBytes;
}
@Override
@@ -242,7 +332,8 @@ public class WorkerMemoryParameters
superSorterMaxActiveProcessors,
superSorterMaxChannelsPerProcessor,
appenderatorMemory,
- broadcastJoinMemory
+ broadcastJoinMemory,
+ partitionStatisticsMaxRetainedBytes
);
}
@@ -254,41 +345,138 @@ public class WorkerMemoryParameters
", superSorterMaxChannelsPerProcessor=" +
superSorterMaxChannelsPerProcessor +
", appenderatorMemory=" + appenderatorMemory +
", broadcastJoinMemory=" + broadcastJoinMemory +
+ ", partitionStatisticsMaxRetainedBytes=" +
partitionStatisticsMaxRetainedBytes +
'}';
}
/**
* Computes the highest value of numInputWorkers, for the given parameters,
that can be passed to
- * {@link #compute} without resulting in a {@link TooManyWorkersFault}.
+ * {@link #createInstance} without resulting in a {@link
TooManyWorkersFault}.
*
* Returns 0 if no number of workers would be OK.
*/
static int computeMaxWorkers(
- final long maxMemoryInJvm,
+ final long usableMemoryInJvm,
final int numWorkersInJvm,
final int numProcessingThreadsInJvm
)
{
- final long bundleMemory = memoryPerBundle(maxMemoryInJvm, numWorkersInJvm,
numProcessingThreadsInJvm);
+ final long bundleMemory = memoryPerBundle(usableMemoryInJvm,
numWorkersInJvm, numProcessingThreadsInJvm);
// Inverse of memoryNeededForInputChannels.
- return Ints.checkedCast((bundleMemory - PROCESSING_MINIMUM_BYTES) /
STANDARD_FRAME_SIZE - 1);
+ return Math.max(0, Ints.checkedCast((bundleMemory -
PROCESSING_MINIMUM_BYTES) / STANDARD_FRAME_SIZE - 1));
+ }
+
+ /**
+ * Maximum number of workers that may exist in the current JVM.
+ */
+ private static int computeNumWorkersInJvm(final Injector injector)
+ {
+ final AppenderatorsManager appenderatorsManager =
injector.getInstance(AppenderatorsManager.class);
+
+ if (appenderatorsManager instanceof UnifiedIndexerAppenderatorsManager) {
+ // CliIndexer
+ return injector.getInstance(WorkerConfig.class).getCapacity();
+ } else {
+ // CliPeon
+ return 1;
+ }
}
+ /**
+ * Maximum number of concurrent processors that exist in the current JVM.
+ */
+ private static int computeNumProcessorsInJvm(final Injector injector)
+ {
+ return injector.getInstance(Bouncer.class).getMaxCount();
+ }
+
+ /**
+ * Compute the memory allocated to each worker. Includes anything that
exists outside of processing bundles.
+ *
+ * Today, we only look at one thing: the amount of memory taken up by
+ * {@link org.apache.druid.msq.statistics.ClusterByStatisticsCollector}.
This is the single largest source of memory
+ * usage outside processing bundles.
+ */
+ private static long memoryPerWorker(
+ final long usableMemoryInJvm,
+ final int numWorkersInJvm
+ )
+ {
+ final long memoryForWorkers = (long) Math.min(
+ usableMemoryInJvm * PARTITION_STATS_MEMORY_MAX_FRACTION,
+ numWorkersInJvm * PARTITION_STATS_MEMORY_MAX_BYTES
+ );
+
+ return memoryForWorkers / numWorkersInJvm;
+ }
+
+ /**
+ * Compute the memory allocated to each processing bundle.
+ */
private static long memoryPerBundle(
- final long maxMemoryInJvm,
+ final long usableMemoryInJvm,
final int numWorkersInJvm,
final int numProcessingThreadsInJvm
)
{
final int bundleCount = numWorkersInJvm + numProcessingThreadsInJvm;
- return (long) (maxMemoryInJvm * USABLE_MEMORY_FRACTION) / bundleCount;
+
+ // Need to subtract memoryForWorkers off the top of usableMemoryInJvm,
since this is reserved for
+ // statistics collection.
+ final long memoryForWorkers = numWorkersInJvm *
memoryPerWorker(usableMemoryInJvm, numWorkersInJvm);
+ final long memoryForBundles = usableMemoryInJvm - memoryForWorkers;
+
+ // Divide up the usable memory per bundle.
+ return memoryForBundles / bundleCount;
}
private static long memoryNeededForInputChannels(final int numInputWorkers)
{
- // Regular processors require input-channel-merging for their inputs.
Calculate how much that is.
- // Requirement: inputChannelsPerProcessor number of input frames, one
output frame.
+ // Workers that read sorted inputs must open all channels at once to do an
N-way merge. Calculate memory needs.
+ // Requirement: one input frame per worker, one buffered output frame.
return (long) STANDARD_FRAME_SIZE * (numInputWorkers + 1);
}
+
+ /**
+ * Amount of heap memory available for our usage.
+ */
+ private static long computeUsableMemoryInJvm(final Injector injector)
+ {
+ return (long) ((Runtime.getRuntime().maxMemory() -
computeTotalLookupFootprint(injector)) * USABLE_MEMORY_FRACTION);
+ }
+
+ /**
+ * Total estimated lookup footprint. Obtained by calling {@link
LookupExtractor#estimateHeapFootprint()} on
+ * all available lookups.
+ */
+ private static long computeTotalLookupFootprint(final Injector injector)
+ {
+ // Subtract memory taken up by lookups. Correctness of this operation
depends on lookups being loaded *before*
+ // we create this instance. Luckily, this is the typical mode of
operation, since by default
+ // druid.lookup.enableLookupSyncOnStartup = true.
+ final LookupReferencesManager lookupManager =
injector.getInstance(LookupReferencesManager.class);
+
+ int lookupCount = 0;
+ long lookupFootprint = 0;
+
+ for (final String lookupName : lookupManager.getAllLookupNames()) {
+ final LookupExtractorFactoryContainer container =
lookupManager.get(lookupName).orElse(null);
+
+ if (container != null) {
+ try {
+ final LookupExtractor extractor =
container.getLookupExtractorFactory().get();
+ lookupFootprint += extractor.estimateHeapFootprint();
+ lookupCount++;
+ }
+ catch (Exception e) {
+ log.noStackTrace().warn(e, "Failed to load lookup [%s] for size
estimation. Skipping.", lookupName);
+ }
+ }
+ }
+
+ log.debug("Lookup footprint: %d lookups with %,d total bytes.",
lookupCount, lookupFootprint);
+
+ return lookupFootprint;
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
index 36479142e0..fe50bc19ac 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
@@ -24,13 +24,11 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Injector;
import com.google.inject.Key;
-import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.frame.processor.Bouncer;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
@@ -40,13 +38,9 @@ import org.apache.druid.msq.exec.Worker;
import org.apache.druid.msq.exec.WorkerClient;
import org.apache.druid.msq.exec.WorkerContext;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
-import org.apache.druid.msq.input.InputSpecs;
import org.apache.druid.msq.kernel.FrameContext;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.rpc.CoordinatorServiceClient;
-import org.apache.druid.query.lookup.LookupExtractor;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
-import org.apache.druid.query.lookup.LookupReferencesManager;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocations;
import org.apache.druid.rpc.ServiceLocator;
@@ -56,7 +50,6 @@ import org.apache.druid.rpc.indexing.SpecificTaskRetryPolicy;
import org.apache.druid.rpc.indexing.SpecificTaskServiceLocator;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.loading.SegmentCacheManager;
-import
org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
import org.apache.druid.server.DruidNode;
import java.io.File;
@@ -74,7 +67,6 @@ public class IndexerWorkerContext implements WorkerContext
private final IndexIO indexIO;
private final TaskDataSegmentProvider dataSegmentProvider;
private final ServiceClientFactory clientFactory;
- private final long availableHeapMemory;
@GuardedBy("this")
private OverlordClient overlordClient;
@@ -87,8 +79,7 @@ public class IndexerWorkerContext implements WorkerContext
final Injector injector,
final IndexIO indexIO,
final TaskDataSegmentProvider dataSegmentProvider,
- final ServiceClientFactory clientFactory,
- final long availableHeapMemory
+ final ServiceClientFactory clientFactory
)
{
this.toolbox = toolbox;
@@ -96,7 +87,6 @@ public class IndexerWorkerContext implements WorkerContext
this.indexIO = indexIO;
this.dataSegmentProvider = dataSegmentProvider;
this.clientFactory = clientFactory;
- this.availableHeapMemory = availableHeapMemory;
}
public static IndexerWorkerContext createProductionInstance(final
TaskToolbox toolbox, final Injector injector)
@@ -115,8 +105,7 @@ public class IndexerWorkerContext implements WorkerContext
injector,
indexIO,
new TaskDataSegmentProvider(coordinatorServiceClient,
segmentCacheManager, indexIO),
- serviceClientFactory,
- computeAvailableHeapMemory(injector)
+ serviceClientFactory
);
}
@@ -234,23 +223,11 @@ public class IndexerWorkerContext implements WorkerContext
@Override
public FrameContext frameContext(QueryDefinition queryDef, int stageNumber)
{
- final IntSet inputStageNumbers =
-
InputSpecs.getStageNumbers(queryDef.getStageDefinition(stageNumber).getInputSpecs());
- final int numInputWorkers =
- inputStageNumbers.intStream()
- .map(inputStageNumber ->
queryDef.getStageDefinition(inputStageNumber).getMaxWorkerCount())
- .sum();
-
return new IndexerFrameContext(
this,
indexIO,
dataSegmentProvider,
- WorkerMemoryParameters.compute(
- availableHeapMemory,
- computeNumWorkersInJvm(),
- processorBouncer().getMaxCount(),
- numInputWorkers
- )
+ WorkerMemoryParameters.createProductionInstanceForWorker(injector,
queryDef, stageNumber)
);
}
@@ -272,20 +249,6 @@ public class IndexerWorkerContext implements WorkerContext
return injector.getInstance(Bouncer.class);
}
- /**
- * Number of workers that may run in the current JVM, including the current
worker.
- */
- private int computeNumWorkersInJvm()
- {
- if (toolbox.getAppenderatorsManager() instanceof
UnifiedIndexerAppenderatorsManager) {
- // CliIndexer
- return injector.getInstance(WorkerConfig.class).getCapacity();
- } else {
- // CliPeon
- return 1;
- }
- }
-
private synchronized OverlordClient makeOverlordClient()
{
if (overlordClient == null) {
@@ -303,46 +266,4 @@ public class IndexerWorkerContext implements WorkerContext
return controllerLocator;
}
-
- /**
- * Amount of memory available for our usage.
- */
- private static long computeAvailableHeapMemory(final Injector injector)
- {
- return Runtime.getRuntime().maxMemory() -
computeTotalLookupFootprint(injector);
- }
-
- /**
- * Total estimated lookup footprint. Obtained by calling {@link
LookupExtractor#estimateHeapFootprint()} on
- * all available lookups.
- */
- private static long computeTotalLookupFootprint(final Injector injector)
- {
- // Subtract memory taken up by lookups. Correctness of this operation
depends on lookups being loaded *before*
- // we create this instance. Luckily, this is the typical mode of
operation, since by default
- // druid.lookup.enableLookupSyncOnStartup = true.
- final LookupReferencesManager lookupManager =
injector.getInstance(LookupReferencesManager.class);
-
- int lookupCount = 0;
- long lookupFootprint = 0;
-
- for (final String lookupName : lookupManager.getAllLookupNames()) {
- final LookupExtractorFactoryContainer container =
lookupManager.get(lookupName).orElse(null);
-
- if (container != null) {
- try {
- final LookupExtractor extractor =
container.getLookupExtractorFactory().get();
- lookupFootprint += extractor.estimateHeapFootprint();
- lookupCount++;
- }
- catch (Exception e) {
- log.noStackTrace().warn(e, "Failed to load lookup [%s] for size
estimation. Skipping.", lookupName);
- }
- }
- }
-
- log.debug("Lookup footprint: %d lookups with %,d total bytes.",
lookupCount, lookupFootprint);
-
- return lookupFootprint;
- }
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
index b353c03df1..dfd2129161 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
@@ -31,25 +31,29 @@ public class NotEnoughMemoryFault extends BaseMSQFault
static final String CODE = "NotEnoughMemory";
private final long serverMemory;
+ private final long usableMemory;
private final int serverWorkers;
private final int serverThreads;
@JsonCreator
public NotEnoughMemoryFault(
@JsonProperty("serverMemory") final long serverMemory,
+ @JsonProperty("usableMemory") final long usableMemory,
@JsonProperty("serverWorkers") final int serverWorkers,
@JsonProperty("serverThreads") final int serverThreads
)
{
super(
CODE,
- "Not enough memory (available = %,d; server workers = %,d; server
threads = %,d)",
+ "Not enough memory (total = %,d; usable = %,d; server workers = %,d;
server threads = %,d)",
serverMemory,
+ usableMemory,
serverWorkers,
serverThreads
);
this.serverMemory = serverMemory;
+ this.usableMemory = usableMemory;
this.serverWorkers = serverWorkers;
this.serverThreads = serverThreads;
}
@@ -60,6 +64,12 @@ public class NotEnoughMemoryFault extends BaseMSQFault
return serverMemory;
}
+ @JsonProperty
+ public long getUsableMemory()
+ {
+ return usableMemory;
+ }
+
@JsonProperty
public int getServerWorkers()
{
@@ -86,6 +96,7 @@ public class NotEnoughMemoryFault extends BaseMSQFault
}
NotEnoughMemoryFault that = (NotEnoughMemoryFault) o;
return serverMemory == that.serverMemory
+ && usableMemory == that.usableMemory
&& serverWorkers == that.serverWorkers
&& serverThreads == that.serverThreads;
}
@@ -93,7 +104,7 @@ public class NotEnoughMemoryFault extends BaseMSQFault
@Override
public int hashCode()
{
- return Objects.hash(super.hashCode(), serverMemory, serverWorkers,
serverThreads);
+ return Objects.hash(super.hashCode(), serverMemory, usableMemory,
serverWorkers, serverThreads);
}
@Override
@@ -101,6 +112,7 @@ public class NotEnoughMemoryFault extends BaseMSQFault
{
return "NotEnoughMemoryFault{" +
"serverMemory=" + serverMemory +
+ ", usableMemory=" + usableMemory +
", serverWorkers=" + serverWorkers +
", serverThreads=" + serverThreads +
'}';
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
index c01506054d..c79bf46ddc 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
@@ -69,12 +69,11 @@ import java.util.function.Supplier;
* The rarely-used parameter {@link #getShuffleCheckHasMultipleValues()}
controls whether the execution system
* checks, while shuffling, if the key used for shuffling has any multi-value
fields. When this is true, the method
* {@link ClusterByStatisticsCollector#hasMultipleValues} is enabled on
collectors
- * {@link #createResultKeyStatisticsCollector()}. Its primary purpose is to
allow ingestion jobs to detect whether the
+ * {@link #createResultKeyStatisticsCollector}. Its primary purpose is to
allow ingestion jobs to detect whether the
* secondary partitioning (CLUSTERED BY) key is multivalued or not.
*/
public class StageDefinition
{
- private static final int PARTITION_STATS_MAX_BYTES = 300_000_000; // Avoid
immediate downsample of single-bucket collectors
private static final int PARTITION_STATS_MAX_BUCKETS = 5_000; // Limit for
TooManyBuckets
private static final int MAX_PARTITIONS = 25_000; // Limit for
TooManyPartitions
@@ -280,7 +279,7 @@ public class StageDefinition
}
}
- public ClusterByStatisticsCollector createResultKeyStatisticsCollector()
+ public ClusterByStatisticsCollector createResultKeyStatisticsCollector(final
int maxRetainedBytes)
{
if (!mustGatherResultKeyStatistics()) {
throw new ISE("No statistics needed");
@@ -289,7 +288,7 @@ public class StageDefinition
return ClusterByStatisticsCollectorImpl.create(
shuffleSpec.getClusterBy(),
signature,
- PARTITION_STATS_MAX_BYTES,
+ maxRetainedBytes,
PARTITION_STATS_MAX_BUCKETS,
shuffleSpec.doesAggregateByClusterKey(),
shuffleCheckHasMultipleValues
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
index 9c02beea88..9de5c692c9 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
@@ -65,6 +65,7 @@ import java.util.stream.Collectors;
public class ControllerQueryKernel
{
private final QueryDefinition queryDef;
+ private final int partitionStatisticsMaxRetainedBytes;
/**
* Stage ID -> tracker for that stage. An extension of the state of this
kernel.
@@ -106,9 +107,10 @@ public class ControllerQueryKernel
*/
private final Set<StageId> effectivelyFinishedStages = new HashSet<>();
- public ControllerQueryKernel(final QueryDefinition queryDef)
+ public ControllerQueryKernel(final QueryDefinition queryDef, final int
partitionStatisticsMaxRetainedBytes)
{
this.queryDef = queryDef;
+ this.partitionStatisticsMaxRetainedBytes =
partitionStatisticsMaxRetainedBytes;
this.inflowMap = ImmutableMap.copyOf(computeStageInflowMap(queryDef));
this.outflowMap = ImmutableMap.copyOf(computeStageOutflowMap(queryDef));
@@ -264,7 +266,8 @@ public class ControllerQueryKernel
stageDef,
stageWorkerCountMap,
slicer,
- assignmentStrategy
+ assignmentStrategy,
+ partitionStatisticsMaxRetainedBytes
);
stageTracker.put(nextStage, stageKernel);
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
index 1b32deb534..3ad01a513c 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
@@ -81,7 +81,8 @@ class ControllerStageTracker
private ControllerStageTracker(
final StageDefinition stageDef,
- final WorkerInputs workerInputs
+ final WorkerInputs workerInputs,
+ final int partitionStatisticsMaxRetainedBytes
)
{
this.stageDef = stageDef;
@@ -89,7 +90,8 @@ class ControllerStageTracker
this.workerInputs = workerInputs;
if (stageDef.mustGatherResultKeyStatistics()) {
- this.resultKeyStatisticsCollector =
stageDef.createResultKeyStatisticsCollector();
+ this.resultKeyStatisticsCollector =
+
stageDef.createResultKeyStatisticsCollector(partitionStatisticsMaxRetainedBytes);
} else {
this.resultKeyStatisticsCollector = null;
generateResultPartitionsAndBoundaries();
@@ -105,11 +107,12 @@ class ControllerStageTracker
final StageDefinition stageDef,
final Int2IntMap stageWorkerCountMap,
final InputSpecSlicer slicer,
- final WorkerAssignmentStrategy assignmentStrategy
+ final WorkerAssignmentStrategy assignmentStrategy,
+ final int partitionStatisticsMaxRetainedBytes
)
{
final WorkerInputs workerInputs = WorkerInputs.create(stageDef,
stageWorkerCountMap, slicer, assignmentStrategy);
- return new ControllerStageTracker(stageDef, workerInputs);
+ return new ControllerStageTracker(stageDef, workerInputs,
partitionStatisticsMaxRetainedBytes);
}
/**
@@ -216,6 +219,10 @@ class ControllerStageTracker
*/
void finish()
{
+ if (resultKeyStatisticsCollector != null) {
+ resultKeyStatisticsCollector.clear();
+ }
+
transitionTo(ControllerStagePhase.FINISHED);
}
@@ -239,6 +246,10 @@ class ControllerStageTracker
final ClusterByStatisticsSnapshot snapshot
)
{
+ if (phase != ControllerStagePhase.READING_INPUT) {
+ throw new ISE("Cannot add result key statistics from stage [%s]", phase);
+ }
+
if (resultKeyStatisticsCollector == null) {
throw new ISE("Stage does not gather result key statistics");
}
@@ -247,10 +258,6 @@ class ControllerStageTracker
throw new IAE("Invalid workerNumber [%s]", workerNumber);
}
- if (phase != ControllerStagePhase.READING_INPUT) {
- throw new ISE("Cannot add result key statistics from stage [%s]", phase);
- }
-
try {
if (workersWithResultKeyStatistics.add(workerNumber)) {
resultKeyStatisticsCollector.addAll(snapshot);
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerMemoryParametersTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerMemoryParametersTest.java
index 8df168b138..f0413ddcd6 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerMemoryParametersTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerMemoryParametersTest.java
@@ -31,52 +31,63 @@ public class WorkerMemoryParametersTest
@Test
public void test_oneWorkerInJvm_alone()
{
- Assert.assertEquals(parameters(1, 45, 373_000_000), compute(1_000_000_000,
1, 1, 1));
- Assert.assertEquals(parameters(2, 14, 248_000_000), compute(1_000_000_000,
1, 2, 1));
- Assert.assertEquals(parameters(4, 3, 148_000_000), compute(1_000_000_000,
1, 4, 1));
- Assert.assertEquals(parameters(3, 2, 81_333_333), compute(1_000_000_000,
1, 8, 1));
- Assert.assertEquals(parameters(1, 4, 42_117_647), compute(1_000_000_000,
1, 16, 1));
+ Assert.assertEquals(parameters(1, 41, 224_785_000, 100_650_000,
75_000_000), compute(1_000_000_000, 1, 1, 1));
+ Assert.assertEquals(parameters(2, 13, 149_410_000, 66_900_000,
75_000_000), compute(1_000_000_000, 1, 2, 1));
+ Assert.assertEquals(parameters(4, 3, 89_110_000, 39_900_000, 75_000_000),
compute(1_000_000_000, 1, 4, 1));
+ Assert.assertEquals(parameters(3, 2, 48_910_000, 21_900_000, 75_000_000),
compute(1_000_000_000, 1, 8, 1));
+ Assert.assertEquals(parameters(2, 2, 33_448_460, 14_976_922, 75_000_000),
compute(1_000_000_000, 1, 12, 1));
final MSQException e = Assert.assertThrows(
MSQException.class,
() -> compute(1_000_000_000, 1, 32, 1)
);
- Assert.assertEquals(new NotEnoughMemoryFault(1_000_000_000, 1, 32),
e.getFault());
+ Assert.assertEquals(new NotEnoughMemoryFault(1_000_000_000, 750_000_000,
1, 32), e.getFault());
}
@Test
public void test_oneWorkerInJvm_twoHundredWorkersInCluster()
{
- Assert.assertEquals(parameters(1, 45, 174_000_000), compute(1_000_000_000,
1, 1, 200));
- Assert.assertEquals(parameters(2, 14, 49_000_000), compute(1_000_000_000,
1, 2, 200));
+ Assert.assertEquals(parameters(1, 83, 317_580_000, 142_200_000,
150_000_000), compute(2_000_000_000, 1, 1, 200));
+ Assert.assertEquals(parameters(2, 27, 166_830_000, 74_700_000,
150_000_000), compute(2_000_000_000, 1, 2, 200));
final MSQException e = Assert.assertThrows(
MSQException.class,
() -> compute(1_000_000_000, 1, 4, 200)
);
- Assert.assertEquals(new TooManyWorkersFault(200, 124), e.getFault());
+ Assert.assertEquals(new TooManyWorkersFault(200, 109), e.getFault());
}
@Test
public void test_fourWorkersInJvm_twoHundredWorkersInCluster()
{
- Assert.assertEquals(parameters(1, 149, 999_000_000),
compute(8_000_000_000L, 4, 1, 200));
- Assert.assertEquals(parameters(2, 61, 799_000_000),
compute(8_000_000_000L, 4, 2, 200));
- Assert.assertEquals(parameters(4, 22, 549_000_000),
compute(8_000_000_000L, 4, 4, 200));
- Assert.assertEquals(parameters(4, 14, 299_000_000),
compute(8_000_000_000L, 4, 8, 200));
- Assert.assertEquals(parameters(4, 8, 99_000_000), compute(8_000_000_000L,
4, 16, 200));
+ Assert.assertEquals(parameters(1, 150, 679_380_000, 304_200_000,
168_750_000), compute(9_000_000_000L, 4, 1, 200));
+ Assert.assertEquals(parameters(2, 62, 543_705_000, 243_450_000,
168_750_000), compute(9_000_000_000L, 4, 2, 200));
+ Assert.assertEquals(parameters(4, 22, 374_111_250, 167_512_500,
168_750_000), compute(9_000_000_000L, 4, 4, 200));
+ Assert.assertEquals(parameters(4, 14, 204_517_500, 91_575_000,
168_750_000), compute(9_000_000_000L, 4, 8, 200));
+ Assert.assertEquals(parameters(4, 8, 68_842_500, 30_825_000, 168_750_000),
compute(9_000_000_000L, 4, 16, 200));
final MSQException e = Assert.assertThrows(
MSQException.class,
() -> compute(8_000_000_000L, 4, 32, 200)
);
- Assert.assertEquals(new TooManyWorkersFault(200, 140), e.getFault());
+ Assert.assertEquals(new TooManyWorkersFault(200, 124), e.getFault());
+
+ // Make sure 107 actually works. (Verify the error message above.)
+ Assert.assertEquals(parameters(4, 3, 28_140_000, 12_600_000, 150_000_000),
compute(8_000_000_000L, 4, 32, 107));
+ }
+
+ @Test
+ public void test_oneWorkerInJvm_negativeUsableMemory()
+ {
+ final MSQException e = Assert.assertThrows(
+ MSQException.class,
+ () -> WorkerMemoryParameters.createInstance(100, -50, 1, 32, 1)
+ );
- // Make sure 140 actually works. (Verify the error message above.)
- Assert.assertEquals(parameters(4, 4, 25_666_666), compute(8_000_000_000L,
4, 32, 140));
+ Assert.assertEquals(new NotEnoughMemoryFault(100, -50, 1, 32),
e.getFault());
}
@Test
@@ -88,14 +99,17 @@ public class WorkerMemoryParametersTest
private static WorkerMemoryParameters parameters(
final int superSorterMaxActiveProcessors,
final int superSorterMaxChannelsPerProcessor,
- final long bundleMemory
+ final long appenderatorMemory,
+ final long broadcastJoinMemory,
+ final int partitionStatisticsMaxRetainedBytes
)
{
return new WorkerMemoryParameters(
superSorterMaxActiveProcessors,
superSorterMaxChannelsPerProcessor,
- (long) (bundleMemory *
WorkerMemoryParameters.APPENDERATOR_MEMORY_FRACTION),
- (long) (bundleMemory *
WorkerMemoryParameters.BROADCAST_JOIN_MEMORY_FRACTION)
+ appenderatorMemory,
+ broadcastJoinMemory,
+ partitionStatisticsMaxRetainedBytes
);
}
@@ -106,8 +120,9 @@ public class WorkerMemoryParametersTest
final int numInputWorkers
)
{
- return WorkerMemoryParameters.compute(
+ return WorkerMemoryParameters.createInstance(
maxMemoryInJvm,
+ (long) (maxMemoryInJvm *
WorkerMemoryParameters.USABLE_MEMORY_FRACTION),
numWorkersInJvm,
numProcessingThreadsInJvm,
numInputWorkers
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java
index fa2bdb2b7e..0ea9ab45f4 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerWorkerContextTest.java
@@ -49,8 +49,7 @@ public class IndexerWorkerContextTest
injectorMock,
null,
null,
- null,
- Runtime.getRuntime().maxMemory()
+ null
);
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
index 77abdfc835..38f80ea283 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
@@ -64,7 +64,7 @@ public class MSQFaultSerdeTest
assertFaultSerde(InsertTimeNullFault.INSTANCE);
assertFaultSerde(new InsertTimeOutOfBoundsFault(Intervals.ETERNITY));
assertFaultSerde(new InvalidNullByteFault("the column"));
- assertFaultSerde(new NotEnoughMemoryFault(1000, 1, 2));
+ assertFaultSerde(new NotEnoughMemoryFault(1000, 900, 1, 2));
assertFaultSerde(QueryNotSupportedFault.INSTANCE);
assertFaultSerde(new RowTooLargeFault(1000));
assertFaultSerde(new TaskStartTimeoutFault(10));
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
index 55fd6b75d0..c31d8c69fa 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
@@ -80,7 +80,7 @@ public class BaseControllerQueryKernelTest extends
InitializedNullHandlingTest
public ControllerQueryKernelTester queryDefinition(QueryDefinition
queryDefinition)
{
this.queryDefinition = Preconditions.checkNotNull(queryDefinition);
- this.controllerQueryKernel = new ControllerQueryKernel(queryDefinition);
+ this.controllerQueryKernel = new ControllerQueryKernel(queryDefinition,
10_000_000);
return this;
}
@@ -220,12 +220,6 @@ public class BaseControllerQueryKernelTest extends
InitializedNullHandlingTest
return controllerQueryKernel.isSuccess();
}
- public ControllerStagePhase getStagePhase(int stageNumber)
- {
- Preconditions.checkArgument(initialized);
- return controllerQueryKernel.getStagePhase(new
StageId(queryDefinition.getQueryId(), stageNumber));
- }
-
public void startStage(int stageNumber)
{
Preconditions.checkArgument(initialized);
@@ -250,7 +244,7 @@ public class BaseControllerQueryKernelTest extends
InitializedNullHandlingTest
// Simulate 1000 keys being encountered in the data, so the kernel can
generate some partitions.
final ClusterByStatisticsCollector keyStatsCollector =
-
queryDefinition.getStageDefinition(stageNumber).createResultKeyStatisticsCollector();
+
queryDefinition.getStageDefinition(stageNumber).createResultKeyStatisticsCollector(10_000_000);
for (int i = 0; i < 1000; i++) {
final RowKey key = KeyTestUtils.createKey(
MockQueryDefinitionBuilder.STAGE_SIGNATURE,
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index b33df1e3f2..35e57b9f2f 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -101,6 +101,7 @@ import
org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
+import org.apache.druid.query.lookup.LookupReferencesManager;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.IndexIO;
@@ -115,6 +116,7 @@ import
org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.LocalLoadSpec;
import org.apache.druid.segment.loading.SegmentCacheManager;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
@@ -231,12 +233,15 @@ public class MSQTestBase extends BaseCalciteQueryTest
public TemporaryFolder tmpFolder = new TemporaryFolder();
private TestGroupByBuffers groupByBuffers;
- protected final WorkerMemoryParameters workerMemoryParameters =
Mockito.spy(WorkerMemoryParameters.compute(
- WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
- 2,
- 10,
- 2
- ));
+ protected final WorkerMemoryParameters workerMemoryParameters = Mockito.spy(
+ WorkerMemoryParameters.createInstance(
+ WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
+ WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
+ 2,
+ 10,
+ 2
+ )
+ );
@After
public void tearDown2()
@@ -341,6 +346,15 @@ public class MSQTestBase extends BaseCalciteQueryTest
binder.bind(ExprMacroTable.class).toInstance(CalciteTests.createExprMacroTable());
binder.bind(DataSegment.PruneSpecsHolder.class).toInstance(DataSegment.PruneSpecsHolder.DEFAULT);
},
+ binder -> {
+ // Requirements of
WorkerMemoryParameters.createProductionInstanceForWorker(injector)
+ final LookupReferencesManager lookupReferencesManager =
+ EasyMock.createStrictMock(LookupReferencesManager.class);
+
EasyMock.expect(lookupReferencesManager.getAllLookupNames()).andReturn(Collections.emptySet());
+ EasyMock.replay(lookupReferencesManager);
+
binder.bind(LookupReferencesManager.class).toInstance(lookupReferencesManager);
+ binder.bind(AppenderatorsManager.class).toProvider(() -> null);
+ },
binder -> {
// Requirements of JoinableFactoryModule
binder.bind(SegmentManager.class).toInstance(EasyMock.createMock(SegmentManager.class));
@@ -414,7 +428,10 @@ public class MSQTestBase extends BaseCalciteQueryTest
try {
return ImmutableMap.<String, Object>builder()
.putAll(DEFAULT_MSQ_CONTEXT)
- .put(DruidQuery.CTX_SCAN_SIGNATURE,
queryFramework().queryJsonMapper().writeValueAsString(signature))
+ .put(
+ DruidQuery.CTX_SCAN_SIGNATURE,
+
queryFramework().queryJsonMapper().writeValueAsString(signature)
+ )
.build();
}
catch (JsonProcessingException e) {
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
index 3cf03b9f22..655077008d 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
@@ -155,8 +155,7 @@ public class MSQTestWorkerContext implements WorkerContext
injector,
indexIO,
null,
- null,
- Runtime.getRuntime().maxMemory()
+ null
),
indexIO,
injector.getInstance(DataSegmentProvider.class),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]