http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fe94aa81/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java index cb5e20e..f9835f6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java @@ -36,7 +36,7 @@ public abstract class SortTemplate implements Sorter, IndexedSortable{ public void setup(FragmentContext context, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException{ // we pass in the local hyperBatch since that is where we'll be reading data. - Preconditions.checkArgument(vector4 != null); + Preconditions.checkNotNull(vector4); this.vector4 = vector4; doSetup(context, hyperBatch, null); }
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fe94aa81/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index 46c747c..c13838c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -31,7 +31,7 @@ import org.apache.drill.exec.vector.ValueVector; * A key thing to know is that the Iterator provided by record batch must align with the rank positions of the field ids * provided utilizing getValueVectorId(); */ -public interface RecordBatch extends VectorAccessible, Iterable<VectorWrapper<?>>{ +public interface RecordBatch extends VectorAccessible { /** * Describes the outcome of a RecordBatch being incremented forward. http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fe94aa81/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index 91aa70b..462c00a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -189,4 +189,8 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess w.clear(); } } + + public int getNumberOfColumns() { + return this.wrappers.size(); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fe94aa81/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java deleted file mode 100644 index 063c00a..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.util; - -import com.beust.jcommander.internal.Lists; -import org.apache.commons.lang.StringUtils; -import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.record.VectorAccessible; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.selection.SelectionVector4; -import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.vector.ValueVector; - -import java.util.List; - -public class BatchPrinter { - public static void printHyperBatch(VectorAccessible batch) { - List<String> columns = Lists.newArrayList(); - List<ValueVector> vectors = Lists.newArrayList(); - int numBatches = 0; - for (VectorWrapper vw : batch) { - columns.add(vw.getValueVectors()[0].getField().getName()); - numBatches = vw.getValueVectors().length; - } - int width = columns.size(); - for (int i = 0; i < numBatches; i++) { - int rows = batch.iterator().next().getValueVectors()[i].getMetadata().getValueCount(); - for (int j = 0; j < rows; j++) { - for (VectorWrapper vw : batch) { - Object o = vw.getValueVectors()[i].getAccessor().getObject(j); - if (o instanceof byte[]) { - String value = new String((byte[]) o); - System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14)); - } else { - String value = o.toString(); - System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14)); - } - } - System.out.printf("|\n"); - } - } - System.out.printf("|\n"); - } - public static void printBatch(VectorAccessible batch) { - List<String> columns = Lists.newArrayList(); - List<ValueVector> vectors = Lists.newArrayList(); - for (VectorWrapper vw : batch) { - columns.add(vw.getValueVector().getField().getName()); - vectors.add(vw.getValueVector()); - } - int width = columns.size(); - int rows = vectors.get(0).getMetadata().getValueCount(); - for (int row = 0; row < rows; row++) { - if (row%50 == 0) { - System.out.println(StringUtils.repeat("-", width * 17 + 1)); - for (String column : columns) { - System.out.printf("| %-15s", width <= 15 ? column : column.substring(0, 14)); - } - System.out.printf("|\n"); - System.out.println(StringUtils.repeat("-", width*17 + 1)); - } - for (ValueVector vv : vectors) { - Object o = vv.getAccessor().getObject(row); - if (o instanceof byte[]) { - String value = new String((byte[]) o); - System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14)); - } else { - String value = o.toString(); - System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14)); - } - } - System.out.printf("|\n"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fe94aa81/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java index f089932..39ec720 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java @@ -18,26 +18,21 @@ package org.apache.drill.exec.cache; import com.beust.jcommander.internal.Lists; -import com.hazelcast.core.MultiMap; -import com.hazelcast.nio.FastByteArrayInputStream; -import com.hazelcast.nio.FastByteArrayOutputStream; -import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.RemoteServiceSet; import org.apache.drill.exec.vector.*; import org.junit.Test; -import java.io.ByteArrayInputStream; -import java.io.DataOutput; import java.util.List; public class TestVectorCache { @@ -69,20 +64,17 @@ public class TestVectorCache { intVector.getMutator().setValueCount(4); binVector.getMutator().setValueCount(4); - VectorWrap wrap = new VectorWrap(vectorList); - /* - FastByteArrayOutputStream out = new FastByteArrayOutputStream(); - wrap.writeData(out); - FastByteArrayInputStream in = new FastByteArrayInputStream(out.getBytes()); - VectorWrap newWrap = new VectorWrap(); - newWrap.readData(in); - */ - MultiMap<String, VectorWrap> mmap = cache.getMultiMap("testMap"); + VectorContainer container = new VectorContainer(); + container.addCollection(vectorList); + VectorContainerSerializable wrap = new VectorContainerSerializable(container); + + DistributedMultiMap<VectorContainerSerializable> mmap = cache.getMultiMap(VectorContainerSerializable.class); mmap.put("vectors", wrap); - VectorWrap newWrap = mmap.get("vectors").iterator().next(); + VectorContainerSerializable newWrap = (VectorContainerSerializable)mmap.get("vectors").iterator().next(); - List<ValueVector> vectors = newWrap.get(); - for (ValueVector vv : vectors) { + VectorContainer newContainer = newWrap.get(); + for (VectorWrapper w : newContainer) { + ValueVector vv = w.getValueVector(); int values = vv.getAccessor().getValueCount(); for (int i = 0; i < values; i++) { Object o = vv.getAccessor().getObject(i); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fe94aa81/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java index 850a40a..d35a09e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java @@ -46,12 +46,22 @@ import java.util.List; import static org.junit.Assert.assertEquals; - +/** + * Tests the OrderedPartitionExchange Operator + */ public class TestOrderedPartitionExchange extends PopUnitTestBase { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestOrderedPartitionExchange.class); + /** + * Starts two drillbits and runs a physical plan with a Mock scan, project, OrderedParititionExchange, Union Exchange, + * and sort. The final sort is done first on the partition column, and verifies that the partitions are correct, in that + * all rows in partition 0 should come in the sort order before any row in partition 1, etc. Also verifies that the standard + * deviation of the size of the partitions is less than one tenth the mean size of the partitions, because we expect all + * the partitions to be roughly equal in size. + * @throws Exception + */ @Test - public void twoBitTwoExchangeTwoEntryRun() throws Exception { + public void twoBitTwoExchangeRun() throws Exception { RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fe94aa81/exec/java-exec/src/test/java/org/apache/drill/exec/util/BatchPrinter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/BatchPrinter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/BatchPrinter.java new file mode 100644 index 0000000..aa68752 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/BatchPrinter.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util; + +import com.beust.jcommander.internal.Lists; +import org.apache.commons.lang.StringUtils; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.vector.ValueVector; + +import java.util.List; + +/** + * This is a tool for printing the content of record batches to screen. Used for debugging. + */ +public class BatchPrinter { + public static void printHyperBatch(VectorAccessible batch) { + List<String> columns = Lists.newArrayList(); + List<ValueVector> vectors = Lists.newArrayList(); + int numBatches = 0; + for (VectorWrapper vw : batch) { + columns.add(vw.getValueVectors()[0].getField().getName()); + numBatches = vw.getValueVectors().length; + } + int width = columns.size(); + for (int i = 0; i < numBatches; i++) { + int rows = batch.iterator().next().getValueVectors()[i].getMetadata().getValueCount(); + for (int j = 0; j < rows; j++) { + for (VectorWrapper vw : batch) { + Object o = vw.getValueVectors()[i].getAccessor().getObject(j); + if (o instanceof byte[]) { + String value = new String((byte[]) o); + System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14)); + } else { + String value = o.toString(); + System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14)); + } + } + System.out.printf("|\n"); + } + } + System.out.printf("|\n"); + } + public static void printBatch(VectorAccessible batch) { + List<String> columns = Lists.newArrayList(); + List<ValueVector> vectors = Lists.newArrayList(); + for (VectorWrapper vw : batch) { + columns.add(vw.getValueVector().getField().getName()); + vectors.add(vw.getValueVector()); + } + int width = columns.size(); + int rows = vectors.get(0).getMetadata().getValueCount(); + for (int row = 0; row < rows; row++) { + if (row%50 == 0) { + System.out.println(StringUtils.repeat("-", width * 17 + 1)); + for (String column : columns) { + System.out.printf("| %-15s", width <= 15 ? column : column.substring(0, 14)); + } + System.out.printf("|\n"); + System.out.println(StringUtils.repeat("-", width*17 + 1)); + } + for (ValueVector vv : vectors) { + Object o = vv.getAccessor().getObject(row); + if (o instanceof byte[]) { + String value = new String((byte[]) o); + System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14)); + } else { + String value = o.toString(); + System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14)); + } + } + System.out.printf("|\n"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fe94aa81/exec/java-exec/src/test/resources/sender/ordered_exchange.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/sender/ordered_exchange.json b/exec/java-exec/src/test/resources/sender/ordered_exchange.json index 1b3d41e..1290406 100644 --- a/exec/java-exec/src/test/resources/sender/ordered_exchange.json +++ b/exec/java-exec/src/test/resources/sender/ordered_exchange.json @@ -43,7 +43,9 @@ {expr: "col1", order: "ASC"}, {expr: "col2", order: "DESC"} ], - ref: "partition" + ref: "partition", + recordsToSample: 15000, + completionFactor: 0.8 }, { @id: 4, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fe94aa81/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java ---------------------------------------------------------------------- diff --git a/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java b/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java index 8c26a65..f51021e 100644 --- a/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java +++ b/sqlparser/src/main/java/org/apache/drill/jdbc/DrillTable.java @@ -184,7 +184,7 @@ public class DrillTable extends BaseQueryable<Object> implements TranslatableTab // JavaTypeFactory typeFactory, // Schema schema, // String name, -// Map<String, Object> operand, +// DistributedMap<String, Object> operand, // RelDataType rowType) { // final ClasspathRSE.ClasspathRSEConfig rseConfig = new ClasspathRSE.ClasspathRSEConfig(); // final ClasspathInputConfig inputConfig = new ClasspathInputConfig(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fe94aa81/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java ---------------------------------------------------------------------- diff --git a/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java b/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java index 6518f42..7b0b24d 100644 --- a/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java +++ b/sqlparser/src/main/java/org/apache/drill/optiq/EnumerableDrill.java @@ -147,7 +147,7 @@ public class EnumerableDrill<E> extends AbstractEnumerable<E> implements Enumera } /** - * Converts a JSON document, represented as an array of bytes, into a Java object (consisting of Map, List, String, + * Converts a JSON document, represented as an array of bytes, into a Java object (consisting of DistributedMap, List, String, * Integer, Double, Boolean). */ static Object parseJson(byte[] bytes) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fe94aa81/sqlparser/src/main/java/org/apache/drill/sql/client/ref/DrillRefImpl.java ---------------------------------------------------------------------- diff --git a/sqlparser/src/main/java/org/apache/drill/sql/client/ref/DrillRefImpl.java b/sqlparser/src/main/java/org/apache/drill/sql/client/ref/DrillRefImpl.java index 0cacfc9..f33b54b 100644 --- a/sqlparser/src/main/java/org/apache/drill/sql/client/ref/DrillRefImpl.java +++ b/sqlparser/src/main/java/org/apache/drill/sql/client/ref/DrillRefImpl.java @@ -211,7 +211,7 @@ public class DrillRefImpl<E> { } /** - * Converts a JSON document, represented as an array of bytes, into a Java object (consisting of Map, List, String, + * Converts a JSON document, represented as an array of bytes, into a Java object (consisting of DistributedMap, List, String, * Integer, Double, Boolean). */ static Object parseJson(byte[] bytes) {
