Github user paul-rogers commented on a diff in the pull request:
https://github.com/apache/drill/pull/960#discussion_r142580052
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
---
@@ -56,30 +59,90 @@ public static void
setupBufferedOpsMemoryAllocations(final PhysicalPlan plan, fi
}
// if there are any sorts, compute the maximum allocation, and set it
on them
- if (bufferedOpList.size() > 0) {
- final OptionManager optionManager = queryContext.getOptions();
- double cpu_load_average =
optionManager.getOption(ExecConstants.CPU_LOAD_AVERAGE);
- final long maxWidth =
optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE);
- final long maxWidthPerNode =
ExecConstants.MAX_WIDTH_PER_NODE.computeMaxWidth(cpu_load_average,maxWidth);
- long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(),
-
queryContext.getConfig().getLong(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC));
- maxAllocPerNode = Math.min(maxAllocPerNode,
-
optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
- final long maxOperatorAlloc = maxAllocPerNode /
(bufferedOpList.size() * maxWidthPerNode);
- logger.debug("Max buffered operator alloc: {}", maxOperatorAlloc);
-
- // User configurable option to allow forcing minimum memory.
- // Ensure that the buffered ops receive the minimum memory needed to
make progress.
- // Without this, the math might work out to allocate too little
memory.
- final long opMinMem =
queryContext.getOptions().getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP_KEY).num_val;
-
- for(final PhysicalOperator op : bufferedOpList) {
-
- long alloc = Math.max(maxOperatorAlloc, op.getInitialAllocation());
- alloc = Math.max(alloc, opMinMem);
- op.setMaxAllocation(alloc);
- }
- }
plan.getProperties().hasResourcePlan = true;
+ if (bufferedOpList.isEmpty()) {
+ return;
+ }
+
+ // Setup options, etc.
+
+ final OptionManager optionManager = queryContext.getOptions();
+ final long directMemory = DrillConfig.getMaxDirectMemory();
+
+ // Compute per-node, per-query memory.
+
+ final long maxAllocPerNode =
computeQueryMemory(queryContext.getConfig(), optionManager, directMemory);
+ logger.debug("Memory per query per node: {}", maxAllocPerNode);
+
+ // Now divide up the memory by slices and operators.
+
+ final long opMinMem = computeOperatorMemory(optionManager,
maxAllocPerNode, bufferedOpList.size());
+
+ for(final PhysicalOperator op : bufferedOpList) {
+ final long alloc = Math.max(opMinMem, op.getInitialAllocation());
+ op.setMaxAllocation(alloc);
+ }
+ }
+
+ /**
+ * Compute per-operator memory based on the computed per-node memory, the
+ * number of operators, and the computed number of fragments (which house
+ * the operators.) Enforces a floor on the amount of memory per operator.
+ *
+ * @param optionManager system option manager
+ * @param maxAllocPerNode computed query memory per node
+ * @param opCount number of buffering operators in this query
+ * @return the per-operator memory
+ */
+
+ public static long computeOperatorMemory(OptionSet optionManager, long
maxAllocPerNode, int opCount) {
+ final long maxWidth =
optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE);
+ final double cpuLoadAverage =
optionManager.getOption(ExecConstants.CPU_LOAD_AVERAGE);
+ final long maxWidthPerNode =
ExecConstants.MAX_WIDTH_PER_NODE.computeMaxWidth(cpuLoadAverage, maxWidth);
--- End diff --
Good point. The queue-based memory assignment feature uses a different
model: it counts the actual operators per node and uses the max across all
nodes. For a simple one-fragment query, the difference in the memory given to
each operator is huge: all of the memory rather than a small slice. We might
want to port that logic to the non-queued case as well.
---