Github user Ben-Zvi commented on a diff in the pull request:
https://github.com/apache/drill/pull/960#discussion_r142576288
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
---
@@ -56,30 +59,90 @@ public static void
setupBufferedOpsMemoryAllocations(final PhysicalPlan plan, fi
}
// if there are any sorts, compute the maximum allocation, and set it
on them
- if (bufferedOpList.size() > 0) {
- final OptionManager optionManager = queryContext.getOptions();
- double cpu_load_average =
optionManager.getOption(ExecConstants.CPU_LOAD_AVERAGE);
- final long maxWidth =
optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE);
- final long maxWidthPerNode =
ExecConstants.MAX_WIDTH_PER_NODE.computeMaxWidth(cpu_load_average,maxWidth);
- long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(),
-
queryContext.getConfig().getLong(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC));
- maxAllocPerNode = Math.min(maxAllocPerNode,
-
optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
- final long maxOperatorAlloc = maxAllocPerNode /
(bufferedOpList.size() * maxWidthPerNode);
- logger.debug("Max buffered operator alloc: {}", maxOperatorAlloc);
-
- // User configurable option to allow forcing minimum memory.
- // Ensure that the buffered ops receive the minimum memory needed to
make progress.
- // Without this, the math might work out to allocate too little
memory.
- final long opMinMem =
queryContext.getOptions().getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP_KEY).num_val;
-
- for(final PhysicalOperator op : bufferedOpList) {
-
- long alloc = Math.max(maxOperatorAlloc, op.getInitialAllocation());
- alloc = Math.max(alloc, opMinMem);
- op.setMaxAllocation(alloc);
- }
- }
plan.getProperties().hasResourcePlan = true;
+ if (bufferedOpList.isEmpty()) {
+ return;
+ }
+
+ // Setup options, etc.
+
+ final OptionManager optionManager = queryContext.getOptions();
+ final long directMemory = DrillConfig.getMaxDirectMemory();
+
+ // Compute per-node, per-query memory.
+
+ final long maxAllocPerNode =
computeQueryMemory(queryContext.getConfig(), optionManager, directMemory);
+ logger.debug("Memory per query per node: {}", maxAllocPerNode);
+
+ // Now divide up the memory by slices and operators.
+
+ final long opMinMem = computeOperatorMemory(optionManager,
maxAllocPerNode, bufferedOpList.size());
+
+ for(final PhysicalOperator op : bufferedOpList) {
+ final long alloc = Math.max(opMinMem, op.getInitialAllocation());
+ op.setMaxAllocation(alloc);
+ }
+ }
+
+ /**
+ * Compute per-operator memory based on the computed per-node memory, the
+ * number of operators, and the computed number of fragments (which house
+ * the operators.) Enforces a floor on the amount of memory per operator.
+ *
+ * @param optionManager system option manager
+ * @param maxAllocPerNode computed query memory per node
+ * @param opCount number of buffering operators in this query
+ * @return the per-operator memory
+ */
+
+ public static long computeOperatorMemory(OptionSet optionManager, long
maxAllocPerNode, int opCount) {
+ final long maxWidth =
optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE);
+ final double cpuLoadAverage =
optionManager.getOption(ExecConstants.CPU_LOAD_AVERAGE);
+ final long maxWidthPerNode =
ExecConstants.MAX_WIDTH_PER_NODE.computeMaxWidth(cpuLoadAverage, maxWidth);
+ final long maxOperatorAlloc = maxAllocPerNode / (opCount *
maxWidthPerNode);
+ logger.debug("Max buffered operator alloc: {}", maxOperatorAlloc);
+
+ // User configurable option to allow forcing minimum memory.
+ // Ensure that the buffered ops receive the minimum memory needed to
make progress.
+ // Without this, the math might work out to allocate too little memory.
+
+ return Math.max(maxOperatorAlloc,
+ optionManager.getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP));
+ }
+
+ /**
+ * Per-node memory calculations based on a number of constraints.
+ * <p>
+ * Factored out into a separate method to allow unit testing.
+ * @param config Drill config
+ * @param optionManager system options
+ * @param directMemory amount of direct memory
+ * @return memory per query per node
+ */
+
+ @VisibleForTesting
+ public static long computeQueryMemory(DrillConfig config, OptionSet
optionManager, long directMemory) {
+
+ // Memory computed as a percent of total memory.
+
+ long perQueryMemory = Math.round(directMemory *
+ optionManager.getOption(ExecConstants.PERCENT_MEMORY_PER_QUERY));
+
+ // But, must allow at least the amount given explicitly for
+ // backward compatibility.
+
+ perQueryMemory = Math.max(perQueryMemory,
+ optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE));
+
+ // Compute again as either the total direct memory, or the
+ // configured maximum top-level allocation (10 GB).
+
+ long maxAllocPerNode = Math.min(directMemory,
+ config.getLong(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC));
--- End diff --
So a query can not go above 10GB per node ? What about a cluster with
256GB per node installed, running a single giant query -- why not use more ?
(Yes this was in the original code, but I did not know then that it meant 10GB
...)
---