DRILL-6030: Managed sort should minimize number of batches in a k-way merge
This closes #1075 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/dcaac1b3 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/dcaac1b3 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/dcaac1b3 Branch: refs/heads/master Commit: dcaac1b37a2df5a52cdb2d86bf78926488fcca64 Parents: fa2005e Author: Vlad Rozov <[email protected]> Authored: Sun Dec 17 09:25:55 2017 -0800 Committer: Parth Chandra <[email protected]> Committed: Thu Jan 11 17:20:06 2018 -0800 ---------------------------------------------------------------------- .../exec/physical/impl/xsort/managed/SortConfig.java | 1 + .../physical/impl/xsort/managed/SortMemoryManager.java | 8 ++++---- exec/java-exec/src/main/resources/drill-module.conf | 4 ++-- .../impl/xsort/managed/TestExternalSortInternals.java | 12 +++++------- 4 files changed, 12 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/dcaac1b3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java index 8ae3998..236c2f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java @@ -42,6 +42,7 @@ public class SortConfig { public static final int MIN_SPILL_BATCH_SIZE = 256 * 1024; public static final int MIN_MERGE_BATCH_SIZE = 256 * 1024; + public static final int DEFAULT_MERGE_LIMIT = 128; public static final int MIN_MERGE_LIMIT = 2; private final long maxMemory; http://git-wip-us.apache.org/repos/asf/drill/blob/dcaac1b3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java index 6c7ce20..68b546b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java @@ -687,11 +687,11 @@ public class SortMemoryManager { spillBatchSize.maxBufferSize); memMergeLimit = Math.max(0, memMergeLimit); - // If batches are in memory, and we need more memory to merge - // them all than is actually available, then spill some in-memory - // batches. + // If batches are in memory, and final merge count will exceed + // merge limit or we need more memory to merge them all than is + // actually available, then spill some in-memory batches. - if (inMemCount > 0 && memMergeLimit < spilledRunsCount) { + if (inMemCount > 0 && ((inMemCount + spilledRunsCount) > config.mergeLimit() || memMergeLimit < spilledRunsCount)) { return new MergeTask(MergeAction.SPILL, 0); } http://git-wip-us.apache.org/repos/asf/drill/blob/dcaac1b3/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index c6a7203..8ac8d7b 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -260,10 +260,10 @@ drill.exec: { // value provided by Foreman. Primarily for testing. // 0 = unlimited, Supports HOCON memory suffixes. mem_limit: 0, - // Limit on the number of spilled batches that can be merged in + // Limit on the number of batches that can be merged in // a single pass. Limits the number of open file handles. // 0 = unlimited - merge_limit: 0, + merge_limit: 128, spill: { // Deprecated for managed xsort; used only by legacy xsort group.size: 40000, http://git-wip-us.apache.org/repos/asf/drill/blob/dcaac1b3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java index 9c31cde..e913c39 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java @@ -47,7 +47,7 @@ public class TestExternalSortInternals extends DrillTest { // Zero means no artificial limit assertEquals(0, sortConfig.maxMemory()); // Zero mapped to large number - assertEquals(Integer.MAX_VALUE, sortConfig.mergeLimit()); + assertEquals(SortConfig.DEFAULT_MERGE_LIMIT, sortConfig.mergeLimit()); // Default size: 256 MiB assertEquals(256 * ONE_MEG, sortConfig.spillFileSize()); // Default size: 1 MiB @@ -622,14 +622,12 @@ public class TestExternalSortInternals extends DrillTest { int spillRunCount = mergeLimitConstraint; long allocMemory = batchSize * memBatchCount; MergeTask task = memManager.consolidateBatches(allocMemory, memBatchCount, spillRunCount); - assertEquals(MergeAction.NONE, task.action); + assertEquals(MergeAction.SPILL, task.action); - // One more run than can merge in one go. But, we have plenty of - // memory to merge and hold the in-memory batches. So, just merge. + // too many to merge, spill - task = memManager.consolidateBatches(allocMemory, memBatchCount, spillRunCount + 1); - assertEquals(MergeAction.MERGE, task.action); - assertEquals(2, task.count); + task = memManager.consolidateBatches(allocMemory, 1, spillRunCount); + assertEquals(MergeAction.SPILL, task.action); // One more runs than can merge in one go, intermediate merge
