[
https://issues.apache.org/jira/browse/DRILL-5080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15847967#comment-15847967
]
ASF GitHub Bot commented on DRILL-5080:
---------------------------------------
Github user Ben-Zvi commented on a diff in the pull request:
https://github.com/apache/drill/pull/717#discussion_r98593811
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/CopierHolder.java
---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import
org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.SortResults;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.google.common.base.Stopwatch;
+
+/**
+ * Manages a {@link PriorityQueueCopier} instance produced from code
generation.
+ * Provides a wrapper around a copier "session" to simplify reading batches
+ * from the copier.
+ */
+
+public class CopierHolder {
+ private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(CopierHolder.class);
+
+ private PriorityQueueCopier copier;
+
+ private final FragmentContext context;
+ private final BufferAllocator allocator;
+ private OperatorCodeGenerator opCodeGen;
+
+ public CopierHolder(FragmentContext context, BufferAllocator allocator,
OperatorCodeGenerator opCodeGen) {
+ this.context = context;
+ this.allocator = allocator;
+ this.opCodeGen = opCodeGen;
+ }
+
+ /**
+ * Start a merge operation using a temporary vector container. Used for
+ * intermediate merges.
+ *
+ * @param schema
+ * @param batchGroupList
+ * @param targetRecordCount
+ * @return
+ */
+
+ public CopierHolder.BatchMerger startMerge(BatchSchema schema, List<?
extends BatchGroup> batchGroupList, int targetRecordCount) {
+ return new BatchMerger(this, schema, batchGroupList,
targetRecordCount);
+ }
+
+ /**
+ * Start a merge operation using the specified vector container. Used for
+ * the final merge operation.
+ *
+ * @param schema
+ * @param batchGroupList
+ * @param outputContainer
+ * @param targetRecordCount
+ * @return
+ */
+ public CopierHolder.BatchMerger startFinalMerge(BatchSchema schema,
List<? extends BatchGroup> batchGroupList, VectorContainer outputContainer, int
targetRecordCount) {
+ return new BatchMerger(this, schema, batchGroupList, outputContainer,
targetRecordCount);
+ }
+
+ /**
+ * Prepare a copier which will write a collection of vectors to disk.
The copier
+ * uses generated code to to the actual writes. If the copier has not
yet been
+ * created, generated code and create it. If it has been created, close
it and
+ * prepare it for a new collection of batches.
+ *
+ * @param batch the (hyper) batch of vectors to be copied
+ * @param batchGroupList same batches as above, but represented as a list
+ * of individual batches
+ * @param outputContainer the container into which to copy the batches
+ * @param allocator allocator to use to allocate memory in the operation
+ */
+
+ @SuppressWarnings("unchecked")
+ private void createCopier(VectorAccessible batch, List<? extends
BatchGroup> batchGroupList, VectorContainer outputContainer) {
+ if (copier != null) {
+ opCodeGen.closeCopier();
+ } else {
+ copier = opCodeGen.getCopier(batch);
+ }
+
+ // Initialize the value vectors for the output container using the
+ // allocator provided
+
+ for (VectorWrapper<?> i : batch) {
+ @SuppressWarnings("resource")
+ ValueVector v = TypeHelper.getNewVector(i.getField(), allocator);
+ outputContainer.add(v);
+ }
+ try {
+ copier.setup(context, allocator, batch, (List<BatchGroup>)
batchGroupList, outputContainer);
+ } catch (SchemaChangeException e) {
+ throw UserException.unsupportedError(e)
+ .message("Unexpected schema change - likely code error.")
+ .build(logger);
+ }
+ }
+
+ public BufferAllocator getAllocator() { return allocator; }
+
+ public void close() {
+ opCodeGen.closeCopier();
+ copier = null;
+ }
+
+ /**
+ * We've gathered a set of batches, each of which has been sorted. The
batches
+ * may have passed through a filter and thus may have "holes" where rows
have
+ * been filtered out. We will spill records in blocks of
targetRecordCount.
+ * To prepare, copy that many records into an outputContainer as a set of
+ * contiguous values in new vectors. The result is a single batch with
+ * vectors that combine a collection of input batches up to the
+ * given threshold.
+ * <p>
+ * Input (selection vector, data vector):<pre>
+ * [3 7 4 8 0 6 1] [5 3 6 8 2 0]
+ * [eh_ad_ibf] [r_qm_kn_p]</pre>
+ * <p>
+ * Output (assuming blocks of 5 records, data vectors only):<pre>
+ * [abcde] [fhikm] [npqr]</pre>
+ * <p>
+ * The copying operation does a merge as well: copying
+ * values from the sources in ordered fashion.
+ * <pre>
+ * Input: [aceg] [bdfh]
+ * Output: [abcdefgh]</pre>
+ * <p>
+ * Here we bind the copier to the batchGroupList of sorted, buffered
batches
+ * to be merged. We bind the copier output to outputContainer: the
copier will write its
+ * merged "batches" of records to that container.
+ * <p>
+ * Calls to the {@link #next()} method sequentially return merged batches
+ * of the desired row count.
+ */
+
+ public static class BatchMerger implements SortResults, AutoCloseable {
+
+ private CopierHolder holder;
+ private VectorContainer hyperBatch;
+ private VectorContainer outputContainer;
+ private int targetRecordCount;
+ private int copyCount;
+ private int batchCount;
+
+ /**
+ * Creates a merger with an temporary output container.
+ *
+ * @param holder
+ * @param batchGroupList
+ * @param targetRecordCount
+ */
+ private BatchMerger(CopierHolder holder, BatchSchema schema, List<?
extends BatchGroup> batchGroupList, int targetRecordCount) {
+ this(holder, schema, batchGroupList, new VectorContainer(),
targetRecordCount);
+ }
+
+ /**
+ * Creates a merger with the specified output container
+ *
+ * @param holder
+ * @param batchGroupList
--- End diff --
ditto ...
> Create a memory-managed version of the External Sort operator
> -------------------------------------------------------------
>
> Key: DRILL-5080
> URL: https://issues.apache.org/jira/browse/DRILL-5080
> Project: Apache Drill
> Issue Type: Improvement
> Affects Versions: 1.8.0
> Reporter: Paul Rogers
> Assignee: Paul Rogers
> Fix For: 1.10
>
> Attachments: ManagedExternalSortDesign.pdf
>
>
> We propose to create a "managed" version of the external sort operator that
> works to a clearly-defined memory limit. Attached is a design specification
> for the work.
> The project will include fixing a number of bugs related to the external
> sort, include as sub-tasks of this umbrella task.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)