Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/717#discussion_r99066218
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java
 ---
    @@ -0,0 +1,294 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl.xsort.managed;
    +
    +import java.util.List;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.drill.common.exceptions.UserException;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.expr.TypeHelper;
    +import org.apache.drill.exec.memory.BufferAllocator;
    +import org.apache.drill.exec.ops.FragmentContext;
    +import 
org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.SortResults;
    +import org.apache.drill.exec.record.BatchSchema;
    +import org.apache.drill.exec.record.MaterializedField;
    +import org.apache.drill.exec.record.VectorAccessible;
    +import org.apache.drill.exec.record.VectorContainer;
    +import org.apache.drill.exec.record.VectorWrapper;
    +import org.apache.drill.exec.vector.ValueVector;
    +
    +import com.google.common.base.Stopwatch;
    +
    +/**
    + * Manages a {@link PriorityQueueCopier} instance produced from code 
generation.
    + * Provides a wrapper around a copier "session" to simplify reading batches
    + * from the copier.
    + */
    +
    +public class CopierHolder {
    +  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(CopierHolder.class);
    +
    +  private PriorityQueueCopier copier;
    +
    +  private final FragmentContext context;
    +  private final BufferAllocator allocator;
    +  private OperatorCodeGenerator opCodeGen;
    +
    +  public CopierHolder(FragmentContext context, BufferAllocator allocator, 
OperatorCodeGenerator opCodeGen) {
    +    this.context = context;
    +    this.allocator = allocator;
    +    this.opCodeGen = opCodeGen;
    +  }
    +
    +  /**
    +   * Start a merge operation using a temporary vector container. Used for
    +   * intermediate merges.
    +   *
    +   * @param schema
    +   * @param batchGroupList
    +   * @param targetRecordCount
    +   * @return
    +   */
    +
    +  public CopierHolder.BatchMerger startMerge(BatchSchema schema, List<? 
extends BatchGroup> batchGroupList, int targetRecordCount) {
    +    return new BatchMerger(this, schema, batchGroupList, 
targetRecordCount);
    +  }
    +
    +  /**
    +   * Start a merge operation using the specified vector container. Used for
    +   * the final merge operation.
    +   *
    +   * @param schema
    +   * @param batchGroupList
    +   * @param outputContainer
    +   * @param targetRecordCount
    +   * @return
    +   */
    +  public CopierHolder.BatchMerger startFinalMerge(BatchSchema schema, 
List<? extends BatchGroup> batchGroupList, VectorContainer outputContainer, int 
targetRecordCount) {
    +    return new BatchMerger(this, schema, batchGroupList, outputContainer, 
targetRecordCount);
    +  }
    +
    +  /**
    +   * Prepare a copier which will write a collection of vectors to disk. 
The copier
    +   * uses generated code to to the actual writes. If the copier has not 
yet been
    +   * created, generated code and create it. If it has been created, close 
it and
    +   * prepare it for a new collection of batches.
    +   *
    +   * @param batch the (hyper) batch of vectors to be copied
    +   * @param batchGroupList same batches as above, but represented as a list
    +   * of individual batches
    +   * @param outputContainer the container into which to copy the batches
    +   * @param allocator allocator to use to allocate memory in the operation
    +   */
    +
    +  @SuppressWarnings("unchecked")
    +  private void createCopier(VectorAccessible batch, List<? extends 
BatchGroup> batchGroupList, VectorContainer outputContainer) {
    +    if (copier != null) {
    +      opCodeGen.closeCopier();
    +    } else {
    +      copier = opCodeGen.getCopier(batch);
    +    }
    +
    +    // Initialize the value vectors for the output container using the
    +    // allocator provided
    +
    +    for (VectorWrapper<?> i : batch) {
    +      @SuppressWarnings("resource")
    +      ValueVector v = TypeHelper.getNewVector(i.getField(), allocator);
    +      outputContainer.add(v);
    +    }
    +    try {
    +      copier.setup(context, allocator, batch, (List<BatchGroup>) 
batchGroupList, outputContainer);
    +    } catch (SchemaChangeException e) {
    +      throw UserException.unsupportedError(e)
    +            .message("Unexpected schema change - likely code error.")
    +            .build(logger);
    +    }
    +  }
    +
    +  public BufferAllocator getAllocator() { return allocator; }
    +
    +  public void close() {
    +    opCodeGen.closeCopier();
    +    copier = null;
    +  }
    +
    +  /**
    +   * We've gathered a set of batches, each of which has been sorted. The 
batches
    +   * may have passed through a filter and thus may have "holes" where rows 
have
    +   * been filtered out. We will spill records in blocks of 
targetRecordCount.
    +   * To prepare, copy that many records into an outputContainer as a set of
    +   * contiguous values in new vectors. The result is a single batch with
    +   * vectors that combine a collection of input batches up to the
    +   * given threshold.
    +   * <p>
    +   * Input (selection vector, data vector):<pre>
    +   * [3 7 4 8 0 6 1] [5 3 6 8 2 0]
    +   * [eh_ad_ibf]     [r_qm_kn_p]</pre>
    +   * <p>
    +   * Output (assuming blocks of 5 records, data vectors only):<pre>
    +   * [abcde] [fhikm] [npqr]</pre>
    +   * <p>
    +   * The copying operation does a merge as well: copying
    +   * values from the sources in ordered fashion.
    +   * <pre>
    +   * Input:  [aceg] [bdfh]
    +   * Output: [abcdefgh]</pre>
    +   * <p>
    +   * Here we bind the copier to the batchGroupList of sorted, buffered 
batches
    +   * to be merged. We bind the copier output to outputContainer: the 
copier will write its
    +   * merged "batches" of records to that container.
    +   * <p>
    +   * Calls to the {@link #next()} method sequentially return merged batches
    +   * of the desired row count.
    +    */
    +
    +  public static class BatchMerger implements SortResults, AutoCloseable {
    +
    +    private CopierHolder holder;
    +    private VectorContainer hyperBatch;
    +    private VectorContainer outputContainer;
    +    private int targetRecordCount;
    +    private int copyCount;
    +    private int batchCount;
    +
    +    /**
    +     * Creates a merger with an temporary output container.
    +     *
    +     * @param holder
    +     * @param batchGroupList
    +     * @param targetRecordCount
    +     */
    +    private BatchMerger(CopierHolder holder, BatchSchema schema, List<? 
extends BatchGroup> batchGroupList, int targetRecordCount) {
    +      this(holder, schema, batchGroupList, new VectorContainer(), 
targetRecordCount);
    +    }
    +
    +    /**
    +     * Creates a merger with the specified output container
    +     *
    +     * @param holder
    +     * @param batchGroupList
    --- End diff --
    
    Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to