DRILL-648: Multiple sort failure in single mode when running TPCH queries
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/01bf8496 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/01bf8496 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/01bf8496 Branch: refs/heads/master Commit: 01bf8496b217781521c943cc1a9a38ed9f841288 Parents: 5484a55 Author: Steven Phillips <[email protected]> Authored: Tue May 27 15:26:14 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Wed May 28 09:18:19 2014 -0700 ---------------------------------------------------------------------- .../physical/impl/xsort/ExternalSortBatch.java | 7 +- .../drill/exec/planner/physical/SortPrel.java | 2 +- .../java/org/apache/drill/BaseTestQuery.java | 3 + .../impl/xsort/TestSimpleExternalSort.java | 101 ++++++++++++------- .../xsort/one_key_sort_descending_sv2.json | 54 ++++++++++ 5 files changed, 126 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/01bf8496/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 4b6c37d..2289680 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -45,6 +45,7 @@ import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder; import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch; import org.apache.drill.exec.record.*; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.util.Utilities; @@ -222,7 +223,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { w.start(); sorter.sort(sv2); // logger.debug("Took {} us to sort {} records", w.elapsed(TimeUnit.MICROSECONDS), sv2.getCount()); - batchGroups.add(new BatchGroup(new RecordBatchData(incoming).getContainer(), sv2)); + RecordBatchData rbd = new RecordBatchData(incoming); + if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE) { + rbd.setSv2(sv2); + } + batchGroups.add(new BatchGroup(rbd.getContainer(), rbd.getSv2())); batchesSinceLastSpill++; if (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE) { mergeAndSpill(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/01bf8496/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java index 464e1bb..3e1bcac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java @@ -100,7 +100,7 @@ public class SortPrel extends SortRel implements Prel { @Override public SelectionVectorMode[] getSupportedEncodings() { - return SelectionVectorMode.DEFAULT; // should support SV2 but there is a bug, DRILL-648 + return SelectionVectorMode.NONE_AND_TWO; } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/01bf8496/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java index e7bc87d..a47796c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java +++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java @@ -159,6 +159,9 @@ public class BaseTestQuery extends ExecTest{ protected void testPhysicalFromFile(String file) throws Exception{ testPhysical(getFile(file)); } + protected List<QueryResultBatch> testPhysicalFromFileWithResults(String file) throws Exception { + return testRunAndReturn(QueryType.PHYSICAL, getFile(file)); + } protected void testLogicalFromFile(String file) throws Exception{ testLogical(getFile(file)); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/01bf8496/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java index 42fe703..221413e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java @@ -20,14 +20,13 @@ package org.apache.drill.exec.physical.impl.xsort; import com.google.common.base.Charsets; import com.google.common.io.Files; +import org.apache.drill.BaseTestQuery; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.util.FileUtils; import org.apache.drill.common.util.TestTools; import org.apache.drill.exec.client.DrillClient; -import org.apache.drill.exec.pop.PopUnitTestBase; -import org.apache.drill.exec.proto.UserProtos; import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.rpc.user.QueryResultBatch; import org.apache.drill.exec.server.Drillbit; @@ -43,7 +42,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public class TestSimpleExternalSort extends PopUnitTestBase { +public class TestSimpleExternalSort extends BaseTestQuery { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSimpleExternalSort.class); DrillConfig c = DrillConfig.create(); @@ -51,54 +50,78 @@ public class TestSimpleExternalSort extends PopUnitTestBase { @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(80000); @Test - public void sortOneKeyDescendingMergeSort() throws Throwable{ - RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + public void mergeSortWithSv2() throws Exception { + List<QueryResultBatch> results = testPhysicalFromFileWithResults("xsort/one_key_sort_descending_sv2.json"); + int count = 0; + for(QueryResultBatch b : results) { + if (b.getHeader().getRowCount() != 0) + count += b.getHeader().getRowCount(); + } + assertEquals(500000, count); - try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); - Drillbit bit2 = new Drillbit(CONFIG, serviceSet); - DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + long previousBigInt = Long.MAX_VALUE; - bit1.run(); - bit2.run(); - client.connect(); - List<QueryResultBatch> results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL, - Files.toString(FileUtils.getResourceAsFile("/xsort/one_key_sort_descending.json"), - Charsets.UTF_8)); - int count = 0; - for(QueryResultBatch b : results) { - if (b.getHeader().getRowCount() != 0) - count += b.getHeader().getRowCount(); + int recordCount = 0; + int batchCount = 0; + + for (QueryResultBatch b : results) { + if (b.getHeader().getRowCount() == 0) break; + batchCount++; + RecordBatchLoader loader = new RecordBatchLoader(allocator); + loader.load(b.getHeader().getDef(),b.getData()); + BigIntVector c1 = (BigIntVector) loader.getValueAccessorById(BigIntVector.class, + loader.getValueVectorId(new SchemaPath("blue", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector(); + + + BigIntVector.Accessor a1 = c1.getAccessor(); + + for(int i =0; i < c1.getAccessor().getValueCount(); i++){ + recordCount++; + assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), previousBigInt >= a1.get(i)); + previousBigInt = a1.get(i); } - assertEquals(1000000, count); + loader.clear(); + b.release(); + } - long previousBigInt = Long.MAX_VALUE; + System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount)); + } - int recordCount = 0; - int batchCount = 0; + @Test + public void sortOneKeyDescendingMergeSort() throws Throwable{ + List<QueryResultBatch> results = testPhysicalFromFileWithResults("xsort/one_key_sort_descending.json"); + int count = 0; + for(QueryResultBatch b : results) { + if (b.getHeader().getRowCount() != 0) + count += b.getHeader().getRowCount(); + } + assertEquals(1000000, count); - for (QueryResultBatch b : results) { - if (b.getHeader().getRowCount() == 0) break; - batchCount++; - RecordBatchLoader loader = new RecordBatchLoader(bit1.getContext().getAllocator()); - loader.load(b.getHeader().getDef(),b.getData()); - BigIntVector c1 = (BigIntVector) loader.getValueAccessorById(BigIntVector.class, loader.getValueVectorId(new SchemaPath("blue", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector(); + long previousBigInt = Long.MAX_VALUE; + int recordCount = 0; + int batchCount = 0; - BigIntVector.Accessor a1 = c1.getAccessor(); -// IntVector.Accessor a2 = c2.getAccessor(); + for (QueryResultBatch b : results) { + if (b.getHeader().getRowCount() == 0) break; + batchCount++; + RecordBatchLoader loader = new RecordBatchLoader(allocator); + loader.load(b.getHeader().getDef(),b.getData()); + BigIntVector c1 = (BigIntVector) loader.getValueAccessorById(BigIntVector.class, loader.getValueVectorId(new SchemaPath("blue", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector(); - for(int i =0; i < c1.getAccessor().getValueCount(); i++){ - recordCount++; - assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), previousBigInt >= a1.get(i)); - previousBigInt = a1.get(i); - } - loader.clear(); - b.release(); - } - System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount)); + BigIntVector.Accessor a1 = c1.getAccessor(); + for(int i =0; i < c1.getAccessor().getValueCount(); i++){ + recordCount++; + assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), previousBigInt >= a1.get(i)); + previousBigInt = a1.get(i); + } + loader.clear(); + b.release(); } + + System.out.println(String.format("Sorted %,d records in %d batches.", recordCount, batchCount)); } @Test http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/01bf8496/exec/java-exec/src/test/resources/xsort/one_key_sort_descending_sv2.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/xsort/one_key_sort_descending_sv2.json b/exec/java-exec/src/test/resources/xsort/one_key_sort_descending_sv2.json new file mode 100644 index 0000000..d10aa96 --- /dev/null +++ b/exec/java-exec/src/test/resources/xsort/one_key_sort_descending_sv2.json @@ -0,0 +1,54 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"mock-scan", + url: "http://apache.org", + entries:[ + {records: 1000000, types: [ + {name: "blue", type: "INT", mode: "REQUIRED"}, + {name: "green", type: "INT", mode: "REQUIRED"} + ]} + ] + }, + { + @id: 2, + pop: "project", + child: 1, + exprs: [ + { ref: "blue", expr: "randomBigInt(100000)" } + ] + }, + { + @id: 3, + pop: "filter", + expr: "alternate()", + child: 2 + }, + { + @id:4, + child: 3, + pop:"external-sort", + orderings: [ + {expr: "blue", order : "DESC"} + ] + }, + { + @id:5, + child: 4, + pop:"selection-vector-remover" + }, + { + @id: 6, + child: 5, + pop: "screen" + } + ] +} \ No newline at end of file
