Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/717#discussion_r99246595
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
 ---
    @@ -0,0 +1,237 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.xsort.managed;
    +
    +import java.util.Queue;
    +
    +import javax.inject.Named;
    +
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.memory.BaseAllocator;
    +import org.apache.drill.exec.memory.BufferAllocator;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import org.apache.drill.exec.record.RecordBatch;
    +import org.apache.drill.exec.record.VectorContainer;
    +import org.apache.drill.exec.record.selection.SelectionVector4;
    +import org.apache.hadoop.util.IndexedSortable;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Queues;
    +
    +import io.netty.buffer.DrillBuf;
    +
    +public abstract class MSortTemplate implements MSorter, IndexedSortable {
    +//  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MSortTemplate.class);
    +
    +  private SelectionVector4 vector4;
    +  private SelectionVector4 aux;
    +  @SuppressWarnings("unused")
    +  private long compares;
    +
    +  /**
    +   * Holds offsets into the SV4 of the start of each batch
    +   * (sorted run.)
    +   */
    +
    +  private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue();
    +  private FragmentContext context;
    +
    +  /**
    +   * Controls the maximum size of batches exposed to downstream
    +   */
    +  private int desiredRecordBatchCount;
    +
    +  @Override
    +  public void setup(final FragmentContext context, final BufferAllocator 
allocator, final SelectionVector4 vector4,
    +                    final VectorContainer hyperBatch, int outputBatchSize) 
throws SchemaChangeException{
    +    // we pass in the local hyperBatch since that is where we'll be 
reading data.
    +    Preconditions.checkNotNull(vector4);
    +    this.vector4 = vector4.createNewWrapperCurrent();
    +    this.context = context;
    +    vector4.clear();
    +    doSetup(context, hyperBatch, null);
    +
    +    // Populate the queue with the offset in the SV4 of each
    +    // batch. Note that this is expensive as it requires a scan
    +    // of all items to be sorted: potentially millions.
    +
    +    runStarts.add(0);
    +    int batch = 0;
    +    final int totalCount = this.vector4.getTotalCount();
    +    for (int i = 0; i < totalCount; i++) {
    +      final int newBatch = this.vector4.get(i) >>> 16;
    +      if (newBatch == batch) {
    +        continue;
    +      } else if (newBatch == batch + 1) {
    +        runStarts.add(i);
    +        batch = newBatch;
    +      } else {
    +        throw new UnsupportedOperationException(String.format("Missing 
batch. batch: %d newBatch: %d", batch, newBatch));
    +      }
    +    }
    +
    +    // Create a temporary SV4 to hold the merged results.
    +
    +    @SuppressWarnings("resource")
    +    final DrillBuf drillBuf = allocator.buffer(4 * totalCount);
    +    desiredRecordBatchCount = Math.min(outputBatchSize, 
Character.MAX_VALUE);
    +    desiredRecordBatchCount = Math.min(desiredRecordBatchCount, 
totalCount);
    +    aux = new SelectionVector4(drillBuf, totalCount, 
desiredRecordBatchCount);
    +  }
    +
    +  /**
    +   * For given recordCount how much memory does MSorter needs for its own 
purpose. This is used in
    +   * ExternalSortBatch to make decisions about whether to spill or not.
    +   *
    +   * @param recordCount
    +   * @return
    +   */
    +  public static long memoryNeeded(final int recordCount) {
    +    // We need 4 bytes (SV4) for each record.
    +    // The memory allocator will round this to the next
    +    // power of 2.
    +
    +    return BaseAllocator.nextPowerOfTwo(recordCount * 4);
    +  }
    +
    +  /**
    +   * Given two regions within the selection vector 4 (a left and a right), 
merge
    +   * the two regions to produce a combined output region in the auxiliary
    +   * selection vector.
    +   *
    +   * @param leftStart
    +   * @param rightStart
    +   * @param rightEnd
    +   * @param outStart
    +   * @return
    +   */
    +  protected int merge(final int leftStart, final int rightStart, final int 
rightEnd, final int outStart) {
    +    int l = leftStart;
    +    int r = rightStart;
    +    int o = outStart;
    +    while (l < rightStart && r < rightEnd) {
    +      if (compare(l, r) <= 0) {
    +        aux.set(o++, vector4.get(l++));
    +      } else {
    +        aux.set(o++, vector4.get(r++));
    +      }
    +    }
    +    while (l < rightStart) {
    +      aux.set(o++, vector4.get(l++));
    +    }
    +    while (r < rightEnd) {
    +      aux.set(o++, vector4.get(r++));
    +    }
    +    assert o == outStart + (rightEnd - leftStart);
    +    return o;
    +  }
    +
    +  @Override
    +  public SelectionVector4 getSV4() {
    +    return vector4;
    +  }
    +
    +  /**
    +   * Sort (really, merge) a set of pre-sorted runs to produce a combined
    +   * result set. Merging is done in the selection vector, record data does
    +   * not move.
    +   * <p>
    +   * Runs are merge pairwise in multiple passes, providing performance
    +   * of O(n * m * log(n)), where n = number of runs, m = number of records
    +   * per run.
    +   */
    +
    +  @Override
    +  public void sort(final VectorContainer container) {
    +    while (runStarts.size() > 1) {
    +      final int totalCount = this.vector4.getTotalCount();
    +
    +      // check if we're cancelled/failed recently
    +      if (!context.shouldContinue()) {
    +        return; }
    +
    +      int outIndex = 0;
    +      final Queue<Integer> newRunStarts = Queues.newLinkedBlockingQueue();
    +      newRunStarts.add(outIndex);
    +      final int size = runStarts.size();
    +      for (int i = 0; i < size / 2; i++) {
    --- End diff --
    
    Heck if I know... This is original code. Presumably this keeps looping 
until only one run remains. If we start with 3, we'll merge 1 & 2 to get 4. Now 
we have 3 and 4 which we merge to get the final one. I think...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to