[ 
https://issues.apache.org/jira/browse/DRILL-5325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027101#comment-16027101
 ] 

ASF GitHub Bot commented on DRILL-5325:
---------------------------------------

Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/808#discussion_r118794047
  
    --- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestExternalSortInternals.java
 ---
    @@ -0,0 +1,632 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.xsort.managed;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +import org.apache.drill.common.config.DrillConfig;
    +import org.apache.drill.exec.ExecConstants;
    +import 
org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeAction;
    +import 
org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeTask;
    +import org.apache.drill.test.ConfigBuilder;
    +import org.apache.drill.test.DrillTest;
    +import org.apache.drill.test.OperatorFixture;
    +import org.junit.Test;
    +
    +public class TestExternalSortInternals extends DrillTest {
    +
    +  private static final int ONE_MEG = 1024 * 1024;
    +
    +  /**
    +   * Verify defaults configured in drill-override.conf.
    +   */
    +  @Test
    +  public void testConfigDefaults() {
    +    DrillConfig drillConfig = DrillConfig.create();
    +    SortConfig sortConfig = new SortConfig(drillConfig);
    +    // Zero means no artificial limit
    +    assertEquals(0, sortConfig.maxMemory());
    +    // Zero mapped to large number
    +    assertEquals(Integer.MAX_VALUE, sortConfig.mergeLimit());
    +    // Default size: 256 MiB
    +    assertEquals(256 * ONE_MEG, sortConfig.spillFileSize());
    +    // Default size: 8 MiB
    +    assertEquals(8 * ONE_MEG, sortConfig.spillBatchSize());
    +    // Default size: 16 MiB
    +    assertEquals(16 * ONE_MEG, sortConfig.mergeBatchSize());
    +    // Default: unlimited
    +    assertEquals(Integer.MAX_VALUE, sortConfig.getBufferedBatchLimit());
    +  }
    +
    +  /**
    +   * Verify that the various constants do, in fact, map to the
    +   * expected properties, and that the properties are overridden.
    +   */
    +  @Test
    +  public void testConfigOverride() {
    +    // Verify the various HOCON ways of setting memory
    +    DrillConfig drillConfig = new ConfigBuilder()
    +        .put(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, "2000K")
    +        .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, 10)
    +        .put(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE, "10M")
    +        .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, 500_000)
    +        .put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, 600_000)
    +        .put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 50)
    +        .build();
    +    SortConfig sortConfig = new SortConfig(drillConfig);
    +    assertEquals(2000 * 1024, sortConfig.maxMemory());
    +    assertEquals(10, sortConfig.mergeLimit());
    +    assertEquals(10 * ONE_MEG, sortConfig.spillFileSize());
    +    assertEquals(500_000, sortConfig.spillBatchSize());
    +    assertEquals(600_000, sortConfig.mergeBatchSize());
    +    assertEquals(50, sortConfig.getBufferedBatchLimit());
    +  }
    +
    +  /**
    +   * Some properties have hard-coded limits. Verify these limits.
    +   */
    +  @Test
    +  public void testConfigLimits() {
    +    DrillConfig drillConfig = new ConfigBuilder()
    +        .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, 
SortConfig.MIN_MERGE_LIMIT - 1)
    +        .put(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE, 
SortConfig.MIN_SPILL_FILE_SIZE - 1)
    +        .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, 
SortConfig.MIN_SPILL_BATCH_SIZE - 1)
    +        .put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, 
SortConfig.MIN_MERGE_BATCH_SIZE - 1)
    +        .put(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 1)
    +        .build();
    +    SortConfig sortConfig = new SortConfig(drillConfig);
    +    assertEquals(SortConfig.MIN_MERGE_LIMIT, sortConfig.mergeLimit());
    +    assertEquals(SortConfig.MIN_SPILL_FILE_SIZE, 
sortConfig.spillFileSize());
    +    assertEquals(SortConfig.MIN_SPILL_BATCH_SIZE, 
sortConfig.spillBatchSize());
    +    assertEquals(SortConfig.MIN_MERGE_BATCH_SIZE, 
sortConfig.mergeBatchSize());
    +    assertEquals(2, sortConfig.getBufferedBatchLimit());
    +  }
    +
    +  @Test
    +  public void testMemoryManagerBasics() {
    +    DrillConfig drillConfig = DrillConfig.create();
    +    SortConfig sortConfig = new SortConfig(drillConfig);
    +    long memoryLimit = 50 * ONE_MEG;
    +    SortMemoryManager memManager = new SortMemoryManager(sortConfig, 
memoryLimit);
    +
    +    // Basic setup
    +
    +    assertEquals(sortConfig.spillBatchSize(), 
memManager.getPreferredSpillBatchSize());
    +    assertEquals(sortConfig.mergeBatchSize(), 
memManager.getPreferredMergeBatchSize());
    +    assertEquals(memoryLimit, memManager.getMemoryLimit());
    +
    +    // Nice simple batch: 6 MB in size, 300 byte rows, vectors half full
    +    // so 10000 rows. Sizes chosen so that spill and merge batch record
    +    // stay below the limit of 64K.
    +
    +    int rowWidth = 300;
    +    int rowCount = 10000;
    +    int batchSize = rowWidth * rowCount * 2;
    +
    +    memManager.updateEstimates(batchSize, rowWidth, rowCount);
    +    verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, 
rowCount);
    +
    +    // Zero rows - no update
    +
    +    memManager.updateEstimates(batchSize, rowWidth, 0);
    +    assertEquals(rowWidth, memManager.getRowWidth());
    +    assertEquals(batchSize, memManager.getInputBatchSize());
    +
    +    // Larger batch size, update batch size
    +
    +    rowCount = 20000;
    +    batchSize = rowWidth * rowCount * 2;
    +    memManager.updateEstimates(batchSize, rowWidth, rowCount);
    +    verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, 
rowCount);
    +
    +    // Smaller batch size: no change
    +
    +    rowCount = 5000;
    +    int lowBatchSize = rowWidth * rowCount * 2;
    +    memManager.updateEstimates(lowBatchSize, rowWidth, rowCount);
    +    assertEquals(rowWidth, memManager.getRowWidth());
    +    assertEquals(batchSize, memManager.getInputBatchSize());
    +
    +    // Different batch density, update batch size
    +
    +    rowCount = 10000;
    +    batchSize = rowWidth * rowCount * 5;
    +    memManager.updateEstimates(batchSize, rowWidth, rowCount);
    +    verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, 
rowCount);
    +
    +    // Smaller row size, no update
    +
    +    int lowRowWidth = 200;
    +    rowCount = 10000;
    +    lowBatchSize = rowWidth * rowCount * 2;
    +    memManager.updateEstimates(lowBatchSize, lowRowWidth, rowCount);
    +    assertEquals(rowWidth, memManager.getRowWidth());
    +    assertEquals(batchSize, memManager.getInputBatchSize());
    +
    +    // Larger row size, updates calcs
    +
    +    rowWidth = 400;
    +    rowCount = 10000;
    +    lowBatchSize = rowWidth * rowCount * 2;
    +    memManager.updateEstimates(lowBatchSize, rowWidth, rowCount);
    +    verifyCalcs(sortConfig, memoryLimit, memManager, batchSize, rowWidth, 
rowCount);
    +
    +    // EOF: very low density
    +
    +    memManager.updateEstimates(lowBatchSize, rowWidth, 5);
    +    assertEquals(rowWidth, memManager.getRowWidth());
    +    assertEquals(batchSize, memManager.getInputBatchSize());
    +  }
    +
    +  private void verifyCalcs(SortConfig sortConfig, long memoryLimit, 
SortMemoryManager memManager, int batchSize,
    +      int rowWidth, int rowCount) {
    +
    +    assertFalse(memManager.mayOverflow());
    +
    +    // Row and batch sizes should be exact
    +
    +    assertEquals(rowWidth, memManager.getRowWidth());
    +    assertEquals(batchSize, memManager.getInputBatchSize());
    +
    +    // Spill sizes will be rounded, but within reason.
    +
    +    int count = sortConfig.spillBatchSize() / rowWidth;
    +    assertTrue(count >= memManager.getSpillBatchRowCount());
    +    assertTrue(count/2 <= memManager.getSpillBatchRowCount());
    +    int spillSize = memManager.getSpillBatchRowCount() * rowWidth;
    +    assertTrue(spillSize <= memManager.getSpillBatchSize());
    +    assertTrue(spillSize >= memManager.getSpillBatchSize()/2);
    +    assertEquals(memoryLimit - memManager.getSpillBatchSize(), 
memManager.getBufferMemoryLimit());
    +
    +    // Merge sizes will also be rounded, within reason.
    +
    +    count = sortConfig.mergeBatchSize() / rowWidth;
    +    assertTrue(count >= memManager.getMergeBatchRowCount());
    +    assertTrue(count/2 <= memManager.getMergeBatchRowCount());
    +    int mergeSize = memManager.getMergeBatchRowCount() * rowWidth;
    +    assertTrue(mergeSize <= memManager.getMergeBatchSize());
    +    assertTrue(mergeSize >= memManager.getMergeBatchSize()/2);
    +    assertEquals(memoryLimit - memManager.getMergeBatchSize(), 
memManager.getMergeMemoryLimit());
    +  }
    +
    +  @Test
    +  public void testSmallRows() {
    +    DrillConfig drillConfig = DrillConfig.create();
    +    SortConfig sortConfig = new SortConfig(drillConfig);
    +    long memoryLimit = 100 * ONE_MEG;
    +    SortMemoryManager memManager = new SortMemoryManager(sortConfig, 
memoryLimit);
    +
    +    // Zero-length row, round to 10
    +
    +    int rowWidth = 0;
    +    int rowCount = 10000;
    +    int batchSize = rowCount * 2;
    +    memManager.updateEstimates(batchSize, rowWidth, rowCount);
    +    assertEquals(10, memManager.getRowWidth());
    +    assertEquals(batchSize, memManager.getInputBatchSize());
    +
    +    // Truncate spill, merge batch row count
    +
    +    assertEquals(Character.MAX_VALUE, memManager.getSpillBatchRowCount());
    +    assertEquals(Character.MAX_VALUE, memManager.getMergeBatchRowCount());
    +
    +    // But leave batch sizes at their defaults
    +
    +    assertEquals(sortConfig.spillBatchSize(), 
memManager.getPreferredSpillBatchSize());
    +    assertEquals(sortConfig.mergeBatchSize(), 
memManager.getPreferredMergeBatchSize());
    +
    +    // Small, but non-zero, row
    +
    +    rowWidth = 20;
    +    rowCount = 10000;
    +    batchSize = rowWidth * rowCount;
    +    memManager.updateEstimates(batchSize, rowWidth, rowCount);
    +    assertEquals(rowWidth, memManager.getRowWidth());
    +    assertEquals(batchSize, memManager.getInputBatchSize());
    +
    +    // Truncate spill, merge batch row count
    +
    +    assertEquals(Character.MAX_VALUE, memManager.getSpillBatchRowCount());
    +    assertEquals(Character.MAX_VALUE, memManager.getMergeBatchRowCount());
    +
    +    // But leave batch sizes at their defaults
    +
    +    assertEquals(sortConfig.spillBatchSize(), 
memManager.getPreferredSpillBatchSize());
    +    assertEquals(sortConfig.mergeBatchSize(), 
memManager.getPreferredMergeBatchSize());
    +  }
    +
    +  @Test
    +  public void testLowMemory() {
    +    DrillConfig drillConfig = DrillConfig.create();
    +    SortConfig sortConfig = new SortConfig(drillConfig);
    +    long memoryLimit = 10 * ONE_MEG;
    +    SortMemoryManager memManager = new SortMemoryManager(sortConfig, 
memoryLimit);
    +
    +    // Tight squeeze, but can be made to work.
    +    // Input batches are a quarter of memory.
    +
    +    int rowWidth = 1000;
    +    int rowCount = (int) (memoryLimit / 4 / rowWidth);
    +    int batchSize = rowCount * rowWidth;
    +    memManager.updateEstimates(batchSize, rowWidth, rowCount);
    +    assertEquals(rowWidth, memManager.getRowWidth());
    +    assertEquals(batchSize, memManager.getInputBatchSize());
    +    assertFalse(memManager.mayOverflow());
    +
    +    // Spill, merge batches should be constrained
    +
    +    int spillBatchSize = memManager.getSpillBatchSize();
    +    assertTrue(spillBatchSize < memManager.getPreferredSpillBatchSize());
    +    assertTrue(spillBatchSize >= rowWidth);
    +    assertTrue(spillBatchSize <= memoryLimit / 3);
    +    assertTrue(spillBatchSize + 2 * batchSize <= memoryLimit);
    +    assertTrue(spillBatchSize / rowWidth >= 
memManager.getSpillBatchRowCount());
    +
    +    int mergeBatchSize = memManager.getMergeBatchSize();
    +    assertTrue(mergeBatchSize < memManager.getPreferredMergeBatchSize());
    +    assertTrue(mergeBatchSize >= rowWidth);
    +    assertTrue(mergeBatchSize + 2 * spillBatchSize <= memoryLimit);
    +    assertTrue(mergeBatchSize / rowWidth >= 
memManager.getMergeBatchRowCount());
    +
    +    // Should spill after just two batches
    +
    +    assertFalse(memManager.isSpillNeeded(0, batchSize));
    +    assertFalse(memManager.isSpillNeeded(batchSize, batchSize));
    +    assertTrue(memManager.isSpillNeeded(2 * batchSize, batchSize));
    +
    +    // Tighter squeeze, but can be made to work.
    +    // Input batches are 3/8 of memory; two fill 3/4,
    +    // but small spill and merge batches allow progress.
    +
    +    rowWidth = 1000;
    +    rowCount = (int) (memoryLimit * 3 / 8 / rowWidth);
    +    batchSize = rowCount * rowWidth;
    +    memManager.updateEstimates(batchSize, rowWidth, rowCount);
    +    assertEquals(rowWidth, memManager.getRowWidth());
    +    assertEquals(batchSize, memManager.getInputBatchSize());
    +    assertFalse(memManager.mayOverflow());
    +
    +    // Spill, merge batches should be constrained
    +
    +    spillBatchSize = memManager.getSpillBatchSize();
    +    assertTrue(spillBatchSize < memManager.getPreferredSpillBatchSize());
    +    assertTrue(spillBatchSize >= rowWidth);
    +    assertTrue(spillBatchSize <= memoryLimit / 3);
    +    assertTrue(spillBatchSize + 2 * batchSize <= memoryLimit);
    +    assertTrue(memManager.getSpillBatchRowCount() > 1);
    +    assertTrue(spillBatchSize / rowWidth >= 
memManager.getSpillBatchRowCount());
    +
    +    mergeBatchSize = memManager.getMergeBatchSize();
    +    assertTrue(mergeBatchSize < memManager.getPreferredMergeBatchSize());
    +    assertTrue(mergeBatchSize >= rowWidth);
    +    assertTrue(mergeBatchSize + 2 * spillBatchSize <= memoryLimit);
    +    assertTrue(memManager.getMergeBatchRowCount() > 1);
    +    assertTrue(mergeBatchSize / rowWidth >= 
memManager.getMergeBatchRowCount());
    +  }
    +
    +  @Test
    +  public void testExtremeLowMemory() {
    +    DrillConfig drillConfig = DrillConfig.create();
    +    SortConfig sortConfig = new SortConfig(drillConfig);
    +    long memoryLimit = 10 * ONE_MEG;
    +    SortMemoryManager memManager = new SortMemoryManager(sortConfig, 
memoryLimit);
    +
    +    // Jumbo row size, works with one row per batch. Minimum is to have two
    +    // input rows and a spill row, or two spill rows and a merge row.
    +    // Have to back off the exact size a bit to allow for internal 
fragmentation
    +    // in the merge and output batches.
    +
    +    int rowWidth = (int) (memoryLimit / 3 * 0.75);
    +    int rowCount = 1;
    +    int batchSize = rowWidth;
    +    memManager.updateEstimates(batchSize, rowWidth, rowCount);
    +    assertEquals(rowWidth, memManager.getRowWidth());
    +    assertEquals(batchSize, memManager.getInputBatchSize());
    +    assertFalse(memManager.mayOverflow());
    +
    +    int spillBatchSize = memManager.getSpillBatchSize();
    +    assertTrue(spillBatchSize >= rowWidth);
    +    assertTrue(spillBatchSize <= memoryLimit / 3);
    +    assertTrue(spillBatchSize + 2 * batchSize <= memoryLimit);
    +    assertEquals(1, memManager.getSpillBatchRowCount());
    +
    +    int mergeBatchSize = memManager.getMergeBatchSize();
    +    assertTrue(mergeBatchSize >= rowWidth);
    +    assertTrue(mergeBatchSize + 2 * spillBatchSize <= memoryLimit);
    +    assertEquals(1, memManager.getMergeBatchRowCount());
    +
    +    // Should spill after just two rows
    +
    +    assertFalse(memManager.isSpillNeeded(0, batchSize));
    +    assertFalse(memManager.isSpillNeeded(batchSize, batchSize));
    +    assertTrue(memManager.isSpillNeeded(2 * batchSize, batchSize));
    +
    +    // In trouble now, can't fit even three rows.
    +
    +    rowWidth = (int) (memoryLimit / 2);
    +    rowCount = 1;
    +    batchSize = rowWidth;
    +    memManager.updateEstimates(batchSize, rowWidth, rowCount);
    +    assertTrue(memManager.mayOverflow());
    +  }
    +
    +  @Test
    +  public void testConfigConstraints() {
    +    int memConstaint = 40 * ONE_MEG;
    +    int batchSizeConstaint = ONE_MEG / 2;
    +    int mergeSizeConstaint = ONE_MEG;
    +    DrillConfig drillConfig = new ConfigBuilder()
    +        .put(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, memConstaint)
    +        .put(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE, 
batchSizeConstaint)
    +        .put(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE, 
mergeSizeConstaint)
    +        .build();
    +    SortConfig sortConfig = new SortConfig(drillConfig);
    +    long memoryLimit = 50 * ONE_MEG;
    +    SortMemoryManager memManager = new SortMemoryManager(sortConfig, 
memoryLimit);
    +
    +    assertEquals(batchSizeConstaint, 
memManager.getPreferredSpillBatchSize());
    +    assertEquals(mergeSizeConstaint, 
memManager.getPreferredMergeBatchSize());
    +    assertEquals(memConstaint, memManager.getMemoryLimit());
    +
    +    int rowWidth = 300;
    +    int rowCount = 10000;
    +    int batchSize = rowWidth * rowCount * 2;
    +
    +    memManager.updateEstimates(batchSize, rowWidth, rowCount);
    +    verifyCalcs(sortConfig, memConstaint, memManager, batchSize, rowWidth, 
rowCount);
    +  }
    +
    +  @Test
    +  public void testMemoryDynamics() {
    +    DrillConfig drillConfig = DrillConfig.create();
    +    SortConfig sortConfig = new SortConfig(drillConfig);
    +    long memoryLimit = 50 * ONE_MEG;
    +    SortMemoryManager memManager = new SortMemoryManager(sortConfig, 
memoryLimit);
    +
    +    int rowWidth = 300;
    +    int rowCount = 10000;
    +    int batchSize = rowWidth * rowCount * 2;
    +
    +    memManager.updateEstimates(batchSize, rowWidth, rowCount);
    +
    +    int spillBatchSize = memManager.getSpillBatchSize();
    +
    +    // Test various memory fill levels
    +
    +    assertFalse(memManager.isSpillNeeded(0, batchSize));
    +    assertFalse(memManager.isSpillNeeded(2 * batchSize, batchSize));
    +    assertTrue(memManager.isSpillNeeded(memoryLimit - spillBatchSize + 1, 
batchSize));
    +
    +    // Similar, but for an in-memory merge
    +
    +    assertTrue(memManager.hasMemoryMergeCapacity(memoryLimit - ONE_MEG, 
ONE_MEG - 1));
    +    assertTrue(memManager.hasMemoryMergeCapacity(memoryLimit - ONE_MEG, 
ONE_MEG));
    +    assertFalse(memManager.hasMemoryMergeCapacity(memoryLimit - ONE_MEG, 
ONE_MEG + 1));
    +  }
    +
    +  @Test
    +  public void testMergeCalcs() {
    +
    +    // No artificial merge limit
    +
    +    int mergeLimitConstraint = 100;
    +    DrillConfig drillConfig = new ConfigBuilder()
    +        .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, mergeLimitConstraint)
    +        .build();
    +    SortConfig sortConfig = new SortConfig(drillConfig);
    +    // Allow four spill batches, 8 MB each, plus one output of 16
    +    long memoryLimit = 50 * ONE_MEG;
    +    SortMemoryManager memManager = new SortMemoryManager(sortConfig, 
memoryLimit);
    +
    +    // Prime the estimates
    +
    +    int rowWidth = 300;
    +    int rowCount = 10000;
    +    int batchSize = rowWidth * rowCount * 2;
    +
    +    memManager.updateEstimates(batchSize, rowWidth, rowCount);
    +    int spillBatchSize = memManager.getSpillBatchSize();
    +    int mergeBatchSize = memManager.getMergeBatchSize();
    +
    +    // One in-mem batch, no merging.
    +
    +    long allocMemory = memoryLimit - mergeBatchSize;
    +    MergeTask task = memManager.consolidateBatches(allocMemory, 1, 0);
    +    assertEquals(MergeAction.NONE, task.action);
    +
    +    // Many in-mem batches, just enough to merge
    +
    +    allocMemory = memoryLimit - mergeBatchSize;
    +    int memBatches = (int) (allocMemory / batchSize);
    +    allocMemory = memBatches * batchSize;
    +    task = memManager.consolidateBatches(allocMemory, memBatches, 0);
    +    assertEquals(MergeAction.NONE, task.action);
    +
    +    // Spills if no room for spill and in-memory batches
    +
    +    task = memManager.consolidateBatches(allocMemory, memBatches, 1);
    +    assertEquals(MergeAction.SPILL, task.action);
    +
    +    // One more in-mem batch: now needs to spill
    +
    +    memBatches++;
    +    allocMemory = memBatches * batchSize;
    +    task = memManager.consolidateBatches(allocMemory, memBatches, 0);
    +    assertEquals(MergeAction.SPILL, task.action);
    +
    +    // No spill for various in-mem/spill run combinations
    +
    +    allocMemory = memoryLimit - spillBatchSize - mergeBatchSize;
    +    memBatches = (int) (allocMemory / batchSize);
    +    allocMemory = memBatches * batchSize;
    +    task = memManager.consolidateBatches(allocMemory, memBatches, 1);
    +    assertEquals(MergeAction.NONE, task.action);
    +
    +    allocMemory = memoryLimit - 2 * spillBatchSize - mergeBatchSize;
    +    memBatches = (int) (allocMemory / batchSize);
    +    allocMemory = memBatches * batchSize;
    +    task = memManager.consolidateBatches(allocMemory, memBatches, 2);
    +    assertEquals(MergeAction.NONE, task.action);
    +
    +    // No spill if no in-memory, only spill, and spill fits
    +
    +    long freeMem = memoryLimit - mergeBatchSize;
    +    int spillBatches = (int) (freeMem / spillBatchSize);
    +    task = memManager.consolidateBatches(0, 0, spillBatches);
    +    assertEquals(MergeAction.NONE, task.action);
    +
    +    // One more and must merge
    +
    +    task = memManager.consolidateBatches(0, 0, spillBatches + 1);
    +    assertEquals(MergeAction.MERGE, task.action);
    +    assertEquals(2, task.count);
    +
    +    // Two more and will merge more
    +
    +    task = memManager.consolidateBatches(0, 0, spillBatches + 2);
    +    assertEquals(MergeAction.MERGE, task.action);
    +    assertEquals(3, task.count);
    +  }
    +
    +  @Test
    +  public void testMergeLimit() {
    +    // Constrain merge width
    +    int mergeLimitConstraint = 5;
    +    DrillConfig drillConfig = new ConfigBuilder()
    +        .put(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT, mergeLimitConstraint)
    +        .build();
    +    SortConfig sortConfig = new SortConfig(drillConfig);
    +    // Plenty of memory, memory will not be a limit
    +    long memoryLimit = 400 * ONE_MEG;
    +    SortMemoryManager memManager = new SortMemoryManager(sortConfig, 
memoryLimit);
    +
    +    // Prime the estimates
    +
    +    int rowWidth = 300;
    +    int rowCount = 10000;
    +    int batchSize = rowWidth * rowCount * 2;
    +
    +    memManager.updateEstimates(batchSize, rowWidth, rowCount);
    +
    +    // Pretend merge limit runs, additional in-memory batches
    +
    +    int memBatchCount = 10;
    +    int spillRunCount = mergeLimitConstraint;
    +    long allocMemory = batchSize * memBatchCount;
    +    MergeTask task = memManager.consolidateBatches(allocMemory, 
memBatchCount, spillRunCount);
    +    assertEquals(MergeAction.NONE, task.action);
    +
    +    // One more run than can merge in one go. But, we have plenty of
    +    // memory to merge and hold the in-memory batches. So, just merge.
    +
    +    task = memManager.consolidateBatches(allocMemory, memBatchCount, 
spillRunCount + 1);
    +    assertEquals(MergeAction.MERGE, task.action);
    +    assertEquals(2, task.count);
    +
    +    // One more runs than can merge in one go, intermediate merge
    +
    +    task = memManager.consolidateBatches(0, 0, spillRunCount + 1);
    +    assertEquals(MergeAction.MERGE, task.action);
    +    assertEquals(2, task.count);
    +
    +    // Two more spill runs, merge three
    +
    +    task = memManager.consolidateBatches(0, 0, spillRunCount + 2);
    +    assertEquals(MergeAction.MERGE, task.action);
    +    assertEquals(3, task.count);
    +
    +    // Way more than can merge, limit to the constraint
    +
    +    task = memManager.consolidateBatches(0, 0, spillRunCount * 3);
    +    assertEquals(MergeAction.MERGE, task.action);
    +    assertEquals(mergeLimitConstraint, task.count);
    +  }
    +
    +  @Test
    +  public void testMetrics() {
    +    OperatorFixture.MockStats stats = new OperatorFixture.MockStats();
    +    SortMetrics metrics = new SortMetrics(stats);
    +
    +    // Input stats
    +
    +    metrics.updateInputMetrics(100, 10_000);
    +    assertEquals(1, metrics.getInputBatchCount());
    +    assertEquals(100, metrics.getInputRowCount());
    +    assertEquals(10_000, metrics.getInputBytes());
    +
    +    metrics.updateInputMetrics(200, 20_000);
    +    assertEquals(2, metrics.getInputBatchCount());
    +    assertEquals(300, metrics.getInputRowCount());
    +    assertEquals(30_000, metrics.getInputBytes());
    +
    +    // Buffer memory
    +
    +    assertEquals(0D, stats.getStat(ExternalSortBatch.Metric.MIN_BUFFER), 
0.01);
    +
    +    metrics.updateMemory(1_000_000);
    +    assertEquals(1_000_000D, 
stats.getStat(ExternalSortBatch.Metric.MIN_BUFFER), 0.01);
    +
    +    metrics.updateMemory(2_000_000);
    +    assertEquals(1_000_000D, 
stats.getStat(ExternalSortBatch.Metric.MIN_BUFFER), 0.01);
    --- End diff --
    
    Can add a comment like: Trying to update to a higher value - should keep 
the previous smaller one. (When I first glanced at this code, I thought it was 
a bug.)
     


> Implement sub-operator unit tests for managed external sort
> -----------------------------------------------------------
>
>                 Key: DRILL-5325
>                 URL: https://issues.apache.org/jira/browse/DRILL-5325
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Tools, Build & Test
>    Affects Versions: 1.11.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>             Fix For: 1.11.0
>
>
> Validate the proposed sub-operator test framework, by creating low-level unit 
> tests for the managed version of the external sort.
> The external sort has a small number of existing tests, but those tests are 
> quite superficial; the "managed sort" project found many bugs. The managed 
> sort itself was tested with ad-hoc system-level tests created using the new 
> "cluster fixture" framework. But, again, such tests could not reach deep 
> inside the sort code to exercise very specific conditions.
> As a result, we spent far too much time using QA functional tests to identify 
> specific code issues.
> Using the sub-opeator unit test framework, we can instead test each bit of 
> functionality at the unit test level.
> If doing so works, and is practical, it can serve as a model for other 
> operator testing projects.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to