[
https://issues.apache.org/jira/browse/DRILL-5080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15849329#comment-15849329
]
ASF GitHub Bot commented on DRILL-5080:
---------------------------------------
Github user Ben-Zvi commented on a diff in the pull request:
https://github.com/apache/drill/pull/717#discussion_r99036541
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.util.Queue;
+
+import javax.inject.Named;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BaseAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.hadoop.util.IndexedSortable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Queues;
+
+import io.netty.buffer.DrillBuf;
+
+public abstract class MSortTemplate implements MSorter, IndexedSortable {
+// private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(MSortTemplate.class);
+
+ private SelectionVector4 vector4;
+ private SelectionVector4 aux;
+ @SuppressWarnings("unused")
+ private long compares;
+
+ /**
+ * Holds offsets into the SV4 of the start of each batch
+ * (sorted run.)
+ */
+
+ private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue();
+ private FragmentContext context;
+
+ /**
+ * Controls the maximum size of batches exposed to downstream
+ */
+ private int desiredRecordBatchCount;
+
+ @Override
+ public void setup(final FragmentContext context, final BufferAllocator
allocator, final SelectionVector4 vector4,
+ final VectorContainer hyperBatch, int outputBatchSize)
throws SchemaChangeException{
+ // we pass in the local hyperBatch since that is where we'll be
reading data.
+ Preconditions.checkNotNull(vector4);
+ this.vector4 = vector4.createNewWrapperCurrent();
+ this.context = context;
+ vector4.clear();
+ doSetup(context, hyperBatch, null);
+
+ // Populate the queue with the offset in the SV4 of each
+ // batch. Note that this is expensive as it requires a scan
+ // of all items to be sorted: potentially millions.
+
+ runStarts.add(0);
+ int batch = 0;
+ final int totalCount = this.vector4.getTotalCount();
+ for (int i = 0; i < totalCount; i++) {
+ final int newBatch = this.vector4.get(i) >>> 16;
+ if (newBatch == batch) {
+ continue;
+ } else if (newBatch == batch + 1) {
+ runStarts.add(i);
+ batch = newBatch;
+ } else {
+ throw new UnsupportedOperationException(String.format("Missing
batch. batch: %d newBatch: %d", batch, newBatch));
+ }
+ }
+
+ // Create a temporary SV4 to hold the merged results.
+
+ @SuppressWarnings("resource")
+ final DrillBuf drillBuf = allocator.buffer(4 * totalCount);
+ desiredRecordBatchCount = Math.min(outputBatchSize,
Character.MAX_VALUE);
+ desiredRecordBatchCount = Math.min(desiredRecordBatchCount,
totalCount);
+ aux = new SelectionVector4(drillBuf, totalCount,
desiredRecordBatchCount);
+ }
+
+ /**
+ * For given recordCount how much memory does MSorter needs for its own
purpose. This is used in
+ * ExternalSortBatch to make decisions about whether to spill or not.
+ *
+ * @param recordCount
+ * @return
+ */
+ public static long memoryNeeded(final int recordCount) {
+ // We need 4 bytes (SV4) for each record.
+ // The memory allocator will round this to the next
+ // power of 2.
+
+ return BaseAllocator.nextPowerOfTwo(recordCount * 4);
+ }
+
+ /**
+ * Given two regions within the selection vector 4 (a left and a right),
merge
+ * the two regions to produce a combined output region in the auxiliary
+ * selection vector.
+ *
+ * @param leftStart
+ * @param rightStart
+ * @param rightEnd
+ * @param outStart
+ * @return
+ */
+ protected int merge(final int leftStart, final int rightStart, final int
rightEnd, final int outStart) {
+ int l = leftStart;
+ int r = rightStart;
+ int o = outStart;
+ while (l < rightStart && r < rightEnd) {
+ if (compare(l, r) <= 0) {
+ aux.set(o++, vector4.get(l++));
+ } else {
+ aux.set(o++, vector4.get(r++));
+ }
+ }
+ while (l < rightStart) {
+ aux.set(o++, vector4.get(l++));
+ }
+ while (r < rightEnd) {
+ aux.set(o++, vector4.get(r++));
+ }
+ assert o == outStart + (rightEnd - leftStart);
+ return o;
+ }
+
+ @Override
+ public SelectionVector4 getSV4() {
+ return vector4;
+ }
+
+ /**
+ * Sort (really, merge) a set of pre-sorted runs to produce a combined
+ * result set. Merging is done in the selection vector, record data does
+ * not move.
+ * <p>
+ * Runs are merge pairwise in multiple passes, providing performance
+ * of O(n * m * log(n)), where n = number of runs, m = number of records
+ * per run.
+ */
+
+ @Override
+ public void sort(final VectorContainer container) {
+ while (runStarts.size() > 1) {
+ final int totalCount = this.vector4.getTotalCount();
+
+ // check if we're cancelled/failed recently
+ if (!context.shouldContinue()) {
+ return; }
+
+ int outIndex = 0;
+ final Queue<Integer> newRunStarts = Queues.newLinkedBlockingQueue();
+ newRunStarts.add(outIndex);
+ final int size = runStarts.size();
+ for (int i = 0; i < size / 2; i++) {
--- End diff --
What happens when "size" is odd ? How is the last run handled ? Maybe add
a comment to explain.
> Create a memory-managed version of the External Sort operator
> -------------------------------------------------------------
>
> Key: DRILL-5080
> URL: https://issues.apache.org/jira/browse/DRILL-5080
> Project: Apache Drill
> Issue Type: Improvement
> Affects Versions: 1.8.0
> Reporter: Paul Rogers
> Assignee: Paul Rogers
> Fix For: 1.10.0
>
> Attachments: ManagedExternalSortDesign.pdf
>
>
> We propose to create a "managed" version of the external sort operator that
> works to a clearly-defined memory limit. Attached is a design specification
> for the work.
> The project will include fixing a number of bugs related to the external
> sort, include as sub-tasks of this umbrella task.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)