DRILL-6030: Managed sort should minimize number of batches in a k-way merge

This closes #1075


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/dcaac1b3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/dcaac1b3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/dcaac1b3

Branch: refs/heads/master
Commit: dcaac1b37a2df5a52cdb2d86bf78926488fcca64
Parents: fa2005e
Author: Vlad Rozov <[email protected]>
Authored: Sun Dec 17 09:25:55 2017 -0800
Committer: Parth Chandra <[email protected]>
Committed: Thu Jan 11 17:20:06 2018 -0800

----------------------------------------------------------------------
 .../exec/physical/impl/xsort/managed/SortConfig.java    |  1 +
 .../physical/impl/xsort/managed/SortMemoryManager.java  |  8 ++++----
 exec/java-exec/src/main/resources/drill-module.conf     |  4 ++--
 .../impl/xsort/managed/TestExternalSortInternals.java   | 12 +++++-------
 4 files changed, 12 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/dcaac1b3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
index 8ae3998..236c2f3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
@@ -42,6 +42,7 @@ public class SortConfig {
   public static final int MIN_SPILL_BATCH_SIZE = 256 * 1024;
   public static final int MIN_MERGE_BATCH_SIZE = 256 * 1024;
 
+  public static final int DEFAULT_MERGE_LIMIT = 128;
   public static final int MIN_MERGE_LIMIT = 2;
 
   private final long maxMemory;

http://git-wip-us.apache.org/repos/asf/drill/blob/dcaac1b3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
index 6c7ce20..68b546b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
@@ -687,11 +687,11 @@ public class SortMemoryManager {
                                 spillBatchSize.maxBufferSize);
     memMergeLimit = Math.max(0, memMergeLimit);
 
-    // If batches are in memory, and we need more memory to merge
-    // them all than is actually available, then spill some in-memory
-    // batches.
+    // If batches are in memory, and final merge count will exceed
+    // merge limit or we need more memory to merge them all than is
+    // actually available, then spill some in-memory batches.
 
-    if (inMemCount > 0  &&  memMergeLimit < spilledRunsCount) {
+    if (inMemCount > 0  &&  ((inMemCount + spilledRunsCount) > 
config.mergeLimit() || memMergeLimit < spilledRunsCount)) {
       return new MergeTask(MergeAction.SPILL, 0);
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/dcaac1b3/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
b/exec/java-exec/src/main/resources/drill-module.conf
index c6a7203..8ac8d7b 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -260,10 +260,10 @@ drill.exec: {
       // value provided by Foreman. Primarily for testing.
       // 0 = unlimited, Supports HOCON memory suffixes.
       mem_limit: 0,
-      // Limit on the number of spilled batches that can be merged in
+      // Limit on the number of batches that can be merged in
       // a single pass. Limits the number of open file handles.
       // 0 = unlimited
-      merge_limit: 0,
+      merge_limit: 128,
       spill: {
         // Deprecated for managed xsort; used only by legacy xsort
         group.size: 40000,

http://git-wip-us.apache.org/repos/asf/drill/blob/dcaac1b3/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
index 9c31cde..e913c39 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
@@ -47,7 +47,7 @@ public class TestExternalSortInternals extends DrillTest {
     // Zero means no artificial limit
     assertEquals(0, sortConfig.maxMemory());
     // Zero mapped to large number
-    assertEquals(Integer.MAX_VALUE, sortConfig.mergeLimit());
+    assertEquals(SortConfig.DEFAULT_MERGE_LIMIT, sortConfig.mergeLimit());
     // Default size: 256 MiB
     assertEquals(256 * ONE_MEG, sortConfig.spillFileSize());
     // Default size: 1 MiB
@@ -622,14 +622,12 @@ public class TestExternalSortInternals extends DrillTest {
     int spillRunCount = mergeLimitConstraint;
     long allocMemory = batchSize * memBatchCount;
     MergeTask task = memManager.consolidateBatches(allocMemory, memBatchCount, 
spillRunCount);
-    assertEquals(MergeAction.NONE, task.action);
+    assertEquals(MergeAction.SPILL, task.action);
 
-    // One more run than can merge in one go. But, we have plenty of
-    // memory to merge and hold the in-memory batches. So, just merge.
+    // too many to merge, spill
 
-    task = memManager.consolidateBatches(allocMemory, memBatchCount, 
spillRunCount + 1);
-    assertEquals(MergeAction.MERGE, task.action);
-    assertEquals(2, task.count);
+    task = memManager.consolidateBatches(allocMemory, 1, spillRunCount);
+    assertEquals(MergeAction.SPILL, task.action);
 
     // One more runs than can merge in one go, intermediate merge
 

Reply via email to