[
https://issues.apache.org/jira/browse/DRILL-5080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15849323#comment-15849323
]
ASF GitHub Bot commented on DRILL-5080:
---------------------------------------
Github user Ben-Zvi commented on a diff in the pull request:
https://github.com/apache/drill/pull/717#discussion_r99027623
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/InMemorySorter.java
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.util.LinkedList;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
+import
org.apache.drill.exec.physical.impl.xsort.managed.ExternalSortBatch.SortResults;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+public class InMemorySorter implements SortResults {
+ private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(InMemorySorter.class);
+
+ private SortRecordBatchBuilder builder;
+ private MSorter mSorter;
+ private final FragmentContext context;
+ private final BufferAllocator oAllocator;
+ private SelectionVector4 sv4;
+ private final OperatorCodeGenerator opCg;
+ private int batchCount;
+
+ public InMemorySorter(FragmentContext context, BufferAllocator
allocator, OperatorCodeGenerator opCg) {
+ this.context = context;
+ this.oAllocator = allocator;
+ this.opCg = opCg;
+ }
+
+ public SelectionVector4 sort(LinkedList<BatchGroup.InputBatch>
batchGroups, VectorAccessible batch,
+ VectorContainer destContainer) {
+ if (builder != null) {
+ builder.clear();
+ builder.close();
+ }
+ builder = new SortRecordBatchBuilder(oAllocator);
+
+ for (BatchGroup.InputBatch group : batchGroups) {
+ RecordBatchData rbd = new RecordBatchData(group.getContainer(),
oAllocator);
+ rbd.setSv2(group.getSv2());
+ builder.add(rbd);
+ }
+ batchGroups.clear();
+
+ try {
+ builder.build(context, destContainer);
+ sv4 = builder.getSv4();
+ mSorter = opCg.createNewMSorter(batch);
+ mSorter.setup(context, oAllocator, sv4, destContainer,
sv4.getCount());
+ } catch (SchemaChangeException e) {
+ throw UserException.unsupportedError(e)
+ .message("Unexpected schema change - likely code error.")
+ .build(logger);
+ }
+
+ // For testing memory-leaks, inject exception after mSorter finishes
setup
+
ExternalSortBatch.injector.injectUnchecked(context.getExecutionControls(),
ExternalSortBatch.INTERRUPTION_AFTER_SETUP);
+ mSorter.sort(destContainer);
+
+ // sort may have prematurely exited due to should continue returning
false.
+ if (!context.shouldContinue()) {
+ return null;
+ }
+
+ // For testing memory-leak purpose, inject exception after mSorter
finishes sorting
+
ExternalSortBatch.injector.injectUnchecked(context.getExecutionControls(),
ExternalSortBatch.INTERRUPTION_AFTER_SORT);
+ sv4 = mSorter.getSV4();
+
+ destContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);
+ return sv4;
+ }
+
+ @Override
+ public boolean next() {
+ boolean more = sv4.next();
+ if (more) { batchCount++; }
+ return more;
+ }
+
+ @Override
+ public void close() {
+ if (builder != null) {
+ builder.clear();
+ builder.close();
+ }
+ if (mSorter != null) {
+ mSorter.clear();
+ }
+ }
+
+ @Override
+ public int getBatchCount() {
+ return batchCount;
+ }
+
+ @Override
+ public int getRecordCount() {
+ return sv4.getTotalCount();
+ }
+}
--- End diff --
github noticed no newline at the end of this file; maybe should add one ...
> Create a memory-managed version of the External Sort operator
> -------------------------------------------------------------
>
> Key: DRILL-5080
> URL: https://issues.apache.org/jira/browse/DRILL-5080
> Project: Apache Drill
> Issue Type: Improvement
> Affects Versions: 1.8.0
> Reporter: Paul Rogers
> Assignee: Paul Rogers
> Fix For: 1.10.0
>
> Attachments: ManagedExternalSortDesign.pdf
>
>
> We propose to create a "managed" version of the external sort operator that
> works to a clearly-defined memory limit. Attached is a design specification
> for the work.
> The project will include fixing a number of bugs related to the external
> sort, include as sub-tasks of this umbrella task.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)