[ 
https://issues.apache.org/jira/browse/DRILL-5080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15849329#comment-15849329
 ] 

ASF GitHub Bot commented on DRILL-5080:
---------------------------------------

Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/717#discussion_r99036541
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
 ---
    @@ -0,0 +1,237 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.xsort.managed;
    +
    +import java.util.Queue;
    +
    +import javax.inject.Named;
    +
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.memory.BaseAllocator;
    +import org.apache.drill.exec.memory.BufferAllocator;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.record.RecordBatch;
    +import org.apache.drill.exec.record.VectorContainer;
    +import org.apache.drill.exec.record.selection.SelectionVector4;
    +import org.apache.hadoop.util.IndexedSortable;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Queues;
    +
    +import io.netty.buffer.DrillBuf;
    +
    +public abstract class MSortTemplate implements MSorter, IndexedSortable {
    +//  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MSortTemplate.class);
    +
    +  private SelectionVector4 vector4;
    +  private SelectionVector4 aux;
    +  @SuppressWarnings("unused")
    +  private long compares;
    +
    +  /**
    +   * Holds offsets into the SV4 of the start of each batch
    +   * (sorted run.)
    +   */
    +
    +  private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue();
    +  private FragmentContext context;
    +
    +  /**
    +   * Controls the maximum size of batches exposed to downstream
    +   */
    +  private int desiredRecordBatchCount;
    +
    +  @Override
    +  public void setup(final FragmentContext context, final BufferAllocator 
allocator, final SelectionVector4 vector4,
    +                    final VectorContainer hyperBatch, int outputBatchSize) 
throws SchemaChangeException{
    +    // we pass in the local hyperBatch since that is where we'll be 
reading data.
    +    Preconditions.checkNotNull(vector4);
    +    this.vector4 = vector4.createNewWrapperCurrent();
    +    this.context = context;
    +    vector4.clear();
    +    doSetup(context, hyperBatch, null);
    +
    +    // Populate the queue with the offset in the SV4 of each
    +    // batch. Note that this is expensive as it requires a scan
    +    // of all items to be sorted: potentially millions.
    +
    +    runStarts.add(0);
    +    int batch = 0;
    +    final int totalCount = this.vector4.getTotalCount();
    +    for (int i = 0; i < totalCount; i++) {
    +      final int newBatch = this.vector4.get(i) >>> 16;
    +      if (newBatch == batch) {
    +        continue;
    +      } else if (newBatch == batch + 1) {
    +        runStarts.add(i);
    +        batch = newBatch;
    +      } else {
    +        throw new UnsupportedOperationException(String.format("Missing 
batch. batch: %d newBatch: %d", batch, newBatch));
    +      }
    +    }
    +
    +    // Create a temporary SV4 to hold the merged results.
    +
    +    @SuppressWarnings("resource")
    +    final DrillBuf drillBuf = allocator.buffer(4 * totalCount);
    +    desiredRecordBatchCount = Math.min(outputBatchSize, 
Character.MAX_VALUE);
    +    desiredRecordBatchCount = Math.min(desiredRecordBatchCount, 
totalCount);
    +    aux = new SelectionVector4(drillBuf, totalCount, 
desiredRecordBatchCount);
    +  }
    +
    +  /**
    +   * For given recordCount how much memory does MSorter needs for its own 
purpose. This is used in
    +   * ExternalSortBatch to make decisions about whether to spill or not.
    +   *
    +   * @param recordCount
    +   * @return
    +   */
    +  public static long memoryNeeded(final int recordCount) {
    +    // We need 4 bytes (SV4) for each record.
    +    // The memory allocator will round this to the next
    +    // power of 2.
    +
    +    return BaseAllocator.nextPowerOfTwo(recordCount * 4);
    +  }
    +
    +  /**
    +   * Given two regions within the selection vector 4 (a left and a right), 
merge
    +   * the two regions to produce a combined output region in the auxiliary
    +   * selection vector.
    +   *
    +   * @param leftStart
    +   * @param rightStart
    +   * @param rightEnd
    +   * @param outStart
    +   * @return
    +   */
    +  protected int merge(final int leftStart, final int rightStart, final int 
rightEnd, final int outStart) {
    +    int l = leftStart;
    +    int r = rightStart;
    +    int o = outStart;
    +    while (l < rightStart && r < rightEnd) {
    +      if (compare(l, r) <= 0) {
    +        aux.set(o++, vector4.get(l++));
    +      } else {
    +        aux.set(o++, vector4.get(r++));
    +      }
    +    }
    +    while (l < rightStart) {
    +      aux.set(o++, vector4.get(l++));
    +    }
    +    while (r < rightEnd) {
    +      aux.set(o++, vector4.get(r++));
    +    }
    +    assert o == outStart + (rightEnd - leftStart);
    +    return o;
    +  }
    +
    +  @Override
    +  public SelectionVector4 getSV4() {
    +    return vector4;
    +  }
    +
    +  /**
    +   * Sort (really, merge) a set of pre-sorted runs to produce a combined
    +   * result set. Merging is done in the selection vector, record data does
    +   * not move.
    +   * <p>
    +   * Runs are merge pairwise in multiple passes, providing performance
    +   * of O(n * m * log(n)), where n = number of runs, m = number of records
    +   * per run.
    +   */
    +
    +  @Override
    +  public void sort(final VectorContainer container) {
    +    while (runStarts.size() > 1) {
    +      final int totalCount = this.vector4.getTotalCount();
    +
    +      // check if we're cancelled/failed recently
    +      if (!context.shouldContinue()) {
    +        return; }
    +
    +      int outIndex = 0;
    +      final Queue<Integer> newRunStarts = Queues.newLinkedBlockingQueue();
    +      newRunStarts.add(outIndex);
    +      final int size = runStarts.size();
    +      for (int i = 0; i < size / 2; i++) {
    --- End diff --
    
    What happens when "size" is odd ? How is the last run handled ?  Maybe add 
a comment to explain.



> Create a memory-managed version of the External Sort operator
> -------------------------------------------------------------
>
>                 Key: DRILL-5080
>                 URL: https://issues.apache.org/jira/browse/DRILL-5080
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.8.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>             Fix For: 1.10.0
>
>         Attachments: ManagedExternalSortDesign.pdf
>
>
> We propose to create a "managed" version of the external sort operator that 
> works to a clearly-defined memory limit. Attached is a design specification 
> for the work.
> The project will include fixing a number of bugs related to the external 
> sort, include as sub-tasks of this umbrella task.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to