http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java index 43ed7e4..cb5e20e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.sort; import javax.inject.Named; +import com.google.common.base.Preconditions; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; @@ -33,9 +34,10 @@ public abstract class SortTemplate implements Sorter, IndexedSortable{ private SelectionVector4 vector4; - public void setup(FragmentContext context, RecordBatch hyperBatch) throws SchemaChangeException{ + public void setup(FragmentContext context, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException{ // we pass in the local hyperBatch since that is where we'll be reading data. - vector4 = hyperBatch.getSelectionVector4(); + Preconditions.checkArgument(vector4 != null); + this.vector4 = vector4; doSetup(context, hyperBatch, null); } @@ -59,7 +61,7 @@ public abstract class SortTemplate implements Sorter, IndexedSortable{ return doEval(sv1, sv2); } - public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); + public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch outgoing); public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex); }
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java index 2099a7c..dcb159c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java @@ -25,7 +25,7 @@ import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector4; public interface Sorter { - public void setup(FragmentContext context, RecordBatch hyperBatch) throws SchemaChangeException; + public void setup(FragmentContext context, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException; public void sort(SelectionVector4 vector4, VectorContainer container); public static TemplateClassDefinition<Sorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<Sorter>(Sorter.class, SortTemplate.class); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java index 30a3d5a..8262691 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java @@ -159,7 +159,7 @@ public class SimpleParallelizer { // TODO: right now we'll just assume that each task is cost 1 so we'll set the breadth at the lesser of the number // of tasks or the maximum width of the fragment. if (diskCost < width) { - width = (int) diskCost; +// width = (int) diskCost; } width = Math.min(width, maxWidthPerEndpoint*allNodes.size()); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index 27d3de1..ec2d246 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -81,7 +81,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements @Override public TypedFieldId getValueVectorId(SchemaPath path) { - return container.getValueVector(path); + return container.getValueVectorId(path); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index 8b6d51c..46c747c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -31,7 +31,7 @@ import org.apache.drill.exec.vector.ValueVector; * A key thing to know is that the Iterator provided by record batch must align with the rank positions of the field ids * provided utilizing getValueVectorId(); */ -public interface RecordBatch extends Iterable<VectorWrapper<?>> { +public interface RecordBatch extends VectorAccessible, Iterable<VectorWrapper<?>>{ /** * Describes the outcome of a RecordBatch being incremented forward. @@ -92,6 +92,7 @@ public interface RecordBatch extends Iterable<VectorWrapper<?>> { * TypedFieldId */ public abstract TypedFieldId getValueVectorId(SchemaPath path); + @Override public abstract VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz); /** http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java index c6d73ea..4057c58 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java @@ -34,7 +34,7 @@ import org.apache.drill.exec.vector.ValueVector; import com.google.common.collect.Maps; -public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{ +public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapper<?>>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchLoader.class); private VectorContainer container = new VectorContainer(); @@ -115,14 +115,14 @@ public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{ } public TypedFieldId getValueVectorId(SchemaPath path) { - return container.getValueVector(path); + return container.getValueVectorId(path); } // // @SuppressWarnings("unchecked") -// public <T extends ValueVector> T getValueVector(int fieldId, Class<?> clazz) { +// public <T extends ValueVector> T getValueVectorId(int fieldId, Class<?> clazz) { // ValueVector v = container.get(fieldId); // assert v != null; // if (v.getClass() != clazz){ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java new file mode 100644 index 0000000..a8100b2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.record; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.vector.ValueVector; + +/** + * Created with IntelliJ IDEA. + * User: sphillips + * Date: 9/30/13 + * Time: 1:40 PM + * To change this template use File | Settings | File Templates. + */ +public interface VectorAccessible extends Iterable<VectorWrapper<?>> { + public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz); + public TypedFieldId getValueVectorId(SchemaPath path); + public BatchSchema getSchema(); + public int getRecordCount(); +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index 939245b..91aa70b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -28,12 +28,13 @@ import org.apache.drill.exec.vector.ValueVector; import com.beust.jcommander.internal.Lists; import com.google.common.base.Preconditions; -public class VectorContainer implements Iterable<VectorWrapper<?>> { +public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccessible { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainer.class); private final List<VectorWrapper<?>> wrappers = Lists.newArrayList(); private final List<VectorWrapper<?>> oldWrappers = Lists.newArrayList(); private BatchSchema schema; + private int recordCount = -1; public VectorContainer() { } @@ -119,7 +120,7 @@ public class VectorContainer implements Iterable<VectorWrapper<?>> { throw new IllegalStateException("You attempted to remove a vector that didn't exist."); } - public TypedFieldId getValueVector(SchemaPath path) { + public TypedFieldId getValueVectorId(SchemaPath path) { for (int i = 0; i < wrappers.size(); i++) { VectorWrapper<?> va = wrappers.get(i); if (va.getField().matches(path)) @@ -129,8 +130,10 @@ public class VectorContainer implements Iterable<VectorWrapper<?>> { return null; } + + @Override @SuppressWarnings("unchecked") - public <T extends ValueVector> VectorWrapper<T> getValueAccessorById(int fieldId, Class<?> clazz) { + public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) { VectorWrapper<?> va = wrappers.get(fieldId); assert va != null; if (va.getVectorClass() != clazz) { @@ -139,7 +142,7 @@ public class VectorContainer implements Iterable<VectorWrapper<?>> { clazz.getCanonicalName(), va.getVectorClass().getCanonicalName())); return null; } - return (VectorWrapper<T>) va; + return (VectorWrapper) va; } public BatchSchema getSchema() { @@ -170,6 +173,16 @@ public class VectorContainer implements Iterable<VectorWrapper<?>> { zeroVectors(); wrappers.clear(); } + + public void setRecordCount(int recordCount) { + this.recordCount = recordCount; + } + + @Override + public int getRecordCount() { + Preconditions.checkState(recordCount != -1, "Record count not set for this vector container"); + return recordCount; + } public void zeroVectors(){ for (VectorWrapper<?> w : wrappers) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java new file mode 100644 index 0000000..063c00a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util; + +import com.beust.jcommander.internal.Lists; +import org.apache.commons.lang.StringUtils; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.vector.ValueVector; + +import java.util.List; + +public class BatchPrinter { + public static void printHyperBatch(VectorAccessible batch) { + List<String> columns = Lists.newArrayList(); + List<ValueVector> vectors = Lists.newArrayList(); + int numBatches = 0; + for (VectorWrapper vw : batch) { + columns.add(vw.getValueVectors()[0].getField().getName()); + numBatches = vw.getValueVectors().length; + } + int width = columns.size(); + for (int i = 0; i < numBatches; i++) { + int rows = batch.iterator().next().getValueVectors()[i].getMetadata().getValueCount(); + for (int j = 0; j < rows; j++) { + for (VectorWrapper vw : batch) { + Object o = vw.getValueVectors()[i].getAccessor().getObject(j); + if (o instanceof byte[]) { + String value = new String((byte[]) o); + System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14)); + } else { + String value = o.toString(); + System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14)); + } + } + System.out.printf("|\n"); + } + } + System.out.printf("|\n"); + } + public static void printBatch(VectorAccessible batch) { + List<String> columns = Lists.newArrayList(); + List<ValueVector> vectors = Lists.newArrayList(); + for (VectorWrapper vw : batch) { + columns.add(vw.getValueVector().getField().getName()); + vectors.add(vw.getValueVector()); + } + int width = columns.size(); + int rows = vectors.get(0).getMetadata().getValueCount(); + for (int row = 0; row < rows; row++) { + if (row%50 == 0) { + System.out.println(StringUtils.repeat("-", width * 17 + 1)); + for (String column : columns) { + System.out.printf("| %-15s", width <= 15 ? column : column.substring(0, 14)); + } + System.out.printf("|\n"); + System.out.println(StringUtils.repeat("-", width*17 + 1)); + } + for (ValueVector vv : vectors) { + Object o = vv.getAccessor().getObject(row); + if (o instanceof byte[]) { + String value = new String((byte[]) o); + System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14)); + } else { + String value = o.toString(); + System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14)); + } + } + System.out.printf("|\n"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java new file mode 100644 index 0000000..f089932 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.cache; + +import com.beust.jcommander.internal.Lists; +import com.hazelcast.core.MultiMap; +import com.hazelcast.nio.FastByteArrayInputStream; +import com.hazelcast.nio.FastByteArrayOutputStream; +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.server.Drillbit; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.RemoteServiceSet; +import org.apache.drill.exec.vector.*; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.DataOutput; +import java.util.List; + +public class TestVectorCache { + + @Test + public void testVectorCache() throws Exception { + List<ValueVector> vectorList = Lists.newArrayList(); + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + DrillConfig config = DrillConfig.create(); + Drillbit bit = new Drillbit(config, serviceSet); + bit.run(); + DrillbitContext context = bit.getContext(); + HazelCache cache = new HazelCache(config); + cache.run(); + + MaterializedField intField = MaterializedField.create(new SchemaPath("int", ExpressionPosition.UNKNOWN), Types.required(TypeProtos.MinorType.INT)); + IntVector intVector = (IntVector)TypeHelper.getNewVector(intField, context.getAllocator()); + MaterializedField binField = MaterializedField.create(new SchemaPath("binary", ExpressionPosition.UNKNOWN), Types.required(TypeProtos.MinorType.VARBINARY)); + VarBinaryVector binVector = (VarBinaryVector)TypeHelper.getNewVector(binField, context.getAllocator()); + AllocationHelper.allocate(intVector, 4, 4); + AllocationHelper.allocate(binVector, 4, 5); + vectorList.add(intVector); + vectorList.add(binVector); + + intVector.getMutator().set(0, 0); binVector.getMutator().set(0, "ZERO".getBytes()); + intVector.getMutator().set(1, 1); binVector.getMutator().set(1, "ONE".getBytes()); + intVector.getMutator().set(2, 2); binVector.getMutator().set(2, "TWO".getBytes()); + intVector.getMutator().set(3, 3); binVector.getMutator().set(3, "THREE".getBytes()); + intVector.getMutator().setValueCount(4); + binVector.getMutator().setValueCount(4); + + VectorWrap wrap = new VectorWrap(vectorList); + /* + FastByteArrayOutputStream out = new FastByteArrayOutputStream(); + wrap.writeData(out); + FastByteArrayInputStream in = new FastByteArrayInputStream(out.getBytes()); + VectorWrap newWrap = new VectorWrap(); + newWrap.readData(in); + */ + MultiMap<String, VectorWrap> mmap = cache.getMultiMap("testMap"); + mmap.put("vectors", wrap); + VectorWrap newWrap = mmap.get("vectors").iterator().next(); + + List<ValueVector> vectors = newWrap.get(); + for (ValueVector vv : vectors) { + int values = vv.getAccessor().getValueCount(); + for (int i = 0; i < values; i++) { + Object o = vv.getAccessor().getObject(i); + if (o instanceof byte[]) { + System.out.println(new String((byte[])o)); + } else { + System.out.println(o); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java new file mode 100644 index 0000000..d12633e --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.fn.impl; + +import org.apache.drill.common.expression.ArgumentValidators; +import org.apache.drill.common.expression.CallProvider; +import org.apache.drill.common.expression.FunctionDefinition; +import org.apache.drill.common.expression.OutputTypeDeterminer; +import org.apache.drill.exec.expr.DrillSimpleFunc; +import org.apache.drill.exec.expr.annotations.FunctionTemplate; +import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope; +import org.apache.drill.exec.expr.annotations.Output; +import org.apache.drill.exec.expr.annotations.Param; +import org.apache.drill.exec.expr.holders.*; +import org.apache.drill.exec.record.RecordBatch; + +import java.util.Random; + +public class GeneratorFunctions { + + public static final Random random = new Random(1234L); + public static final FunctionDefinition RANDOM_BIG_INT = FunctionDefinition.simple("randomBigInt", new ArgumentValidators.NumericTypeAllowed(1,2, true), + OutputTypeDeterminer.FIXED_BIGINT, "randomBigInt"); + public static final FunctionDefinition RANDOM_FLOAT8 = FunctionDefinition.simple("randomFloat8", new ArgumentValidators.NumericTypeAllowed(1,2, true), + OutputTypeDeterminer.FIXED_FLOAT8, "randomFloat8"); + + public static class Provider implements CallProvider { + + @Override + public FunctionDefinition[] getFunctionDefintions() { + return new FunctionDefinition[] { RANDOM_BIG_INT, + RANDOM_FLOAT8 }; + } + + } + + @FunctionTemplate(name = "randomBigInt", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL) + public static class RandomBigIntGauss implements DrillSimpleFunc { + + @Param BigIntHolder range; + @Output BigIntHolder out; + + public void setup(RecordBatch incoming) { + } + + public void eval() { + out.value = (long)(org.apache.drill.exec.fn.impl.GeneratorFunctions.random.nextGaussian() * range.value); + } + } + + @FunctionTemplate(name = "randomBigInt", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL) + public static class RandomBigInt implements DrillSimpleFunc { + + @Param BigIntHolder min; + @Param BigIntHolder max; + @Output BigIntHolder out; + + public void setup(RecordBatch incoming) { + } + + public void eval() { + out.value = (long)(org.apache.drill.exec.fn.impl.GeneratorFunctions.random.nextFloat() * (max.value - min.value) + min.value); + } + } + + @FunctionTemplate(name = "randomFloat8", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL) + public static class RandomFloat8Gauss implements DrillSimpleFunc { + + @Param BigIntHolder range; + @Output + Float8Holder out; + + public void setup(RecordBatch incoming) { + } + + public void eval() { + out.value = org.apache.drill.exec.fn.impl.GeneratorFunctions.random.nextGaussian() * range.value; + } + } + + @FunctionTemplate(name = "randomFloat8", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.NULL_IF_NULL) + public static class RandomFloat8 implements DrillSimpleFunc { + + @Param BigIntHolder min; + @Param BigIntHolder max; + @Output Float8Holder out; + + public void setup(RecordBatch incoming) { + } + + public void eval() { + out.value = org.apache.drill.exec.fn.impl.GeneratorFunctions.random.nextFloat() * (max.value - min.value) + min.value; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java index bbe1c18..0cc09b4 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java @@ -38,6 +38,8 @@ import com.google.common.io.Files; public class TestHashToRandomExchange extends PopUnitTestBase { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestHashToRandomExchange.class); + //Todo reenable this test once fix for has partition assignments is included + @Ignore @Test public void twoBitTwoExchangeTwoEntryRun() throws Exception { RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java new file mode 100644 index 0000000..850a40a --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.orderedpartitioner; + +import com.beust.jcommander.internal.Lists; +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.commons.math.stat.descriptive.moment.Mean; +import org.apache.commons.math.stat.descriptive.moment.StandardDeviation; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.util.FileUtils; +import org.apache.drill.exec.client.DrillClient; +import org.apache.drill.exec.pop.PopUnitTestBase; +import org.apache.drill.exec.proto.UserProtos; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.drill.exec.server.BootStrapContext; +import org.apache.drill.exec.server.Drillbit; +import org.apache.drill.exec.server.RemoteServiceSet; +import org.apache.drill.exec.util.BatchPrinter; +import org.apache.drill.exec.vector.BigIntVector; +import org.apache.drill.exec.vector.Float8Vector; +import org.apache.drill.exec.vector.IntVector; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; + + +public class TestOrderedPartitionExchange extends PopUnitTestBase { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestOrderedPartitionExchange.class); + + @Test + public void twoBitTwoExchangeTwoEntryRun() throws Exception { + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + + try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); + Drillbit bit2 = new Drillbit(CONFIG, serviceSet); + DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());) { + + bit1.run(); + bit2.run(); + client.connect(); + List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL, + Files.toString(FileUtils.getResourceAsFile("/sender/ordered_exchange.json"), + Charsets.UTF_8)); + int count = 0; + List<Integer> partitionRecordCounts = Lists.newArrayList(); + for(QueryResultBatch b : results) { + if (b.getData() != null) { + int rows = b.getHeader().getRowCount(); + count += rows; + RecordBatchLoader loader = new RecordBatchLoader(new BootStrapContext(DrillConfig.create()).getAllocator()); + loader.load(b.getHeader().getDef(), b.getData()); + BigIntVector vv1 = (BigIntVector)loader.getValueAccessorById(loader.getValueVectorId( + new SchemaPath("col1", ExpressionPosition.UNKNOWN)).getFieldId(), BigIntVector.class).getValueVector(); + Float8Vector vv2 = (Float8Vector)loader.getValueAccessorById(loader.getValueVectorId( + new SchemaPath("col2", ExpressionPosition.UNKNOWN)).getFieldId(), Float8Vector.class).getValueVector(); + IntVector pVector = (IntVector)loader.getValueAccessorById(loader.getValueVectorId( + new SchemaPath("partition", ExpressionPosition.UNKNOWN)).getFieldId(), IntVector.class).getValueVector(); + long previous1 = Long.MIN_VALUE; + double previous2 = Double.MIN_VALUE; + int partPrevious = -1; + long current1 = Long.MIN_VALUE; + double current2 = Double.MIN_VALUE; + int partCurrent = -1; + int partitionRecordCount = 0; + for (int i = 0; i < rows; i++) { + previous1 = current1; + previous2 = current2; + partPrevious = partCurrent; + current1 = vv1.getAccessor().get(i); + current2 = vv2.getAccessor().get(i); + partCurrent = pVector.getAccessor().get(i); + Assert.assertTrue(current1 >= previous1); + if (current1 == previous1) { + Assert.assertTrue(current2 <= previous2); + } + if (partCurrent == partPrevious || partPrevious == -1) { + partitionRecordCount++; + } else { + partitionRecordCounts.add(partitionRecordCount); + partitionRecordCount = 0; + } + } + partitionRecordCounts.add(partitionRecordCount); + } + } + double[] values = new double[partitionRecordCounts.size()]; + int i = 0; + for (Integer rc : partitionRecordCounts) { + values[i++] = rc.doubleValue(); + } + StandardDeviation stdDev = new StandardDeviation(); + Mean mean = new Mean(); + double std = stdDev.evaluate(values); + double m = mean.evaluate(values); + System.out.println("mean: " + m + " std dev: " + std); + Assert.assertTrue(std < 0.1 * m); + assertEquals(31000, count); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java index 294a4f0..78f7e43 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java @@ -45,8 +45,8 @@ public abstract class PopUnitTestBase { protected static DrillConfig CONFIG; // Set a timeout unless we're debugging. - @Rule public TestRule globalTimeout = IS_DEBUG ? new TestName() : new Timeout(10000); - + @Rule public TestRule globalTimeout = IS_DEBUG ? new TestName() : new Timeout(25000); + @BeforeClass public static void setup() { CONFIG = DrillConfig.create(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/test/resources/sender/hash_exchange.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/sender/hash_exchange.json b/exec/java-exec/src/test/resources/sender/hash_exchange.json index 3454361..d8f9579 100644 --- a/exec/java-exec/src/test/resources/sender/hash_exchange.json +++ b/exec/java-exec/src/test/resources/sender/hash_exchange.json @@ -15,12 +15,12 @@ {records: 100, types: [ {name: "blue", type: "INT", mode: "REQUIRED"}, {name: "red", type: "BIGINT", mode: "REQUIRED"}, - {name: "green", type: "INT", mode: "REQUIRED"} + {name: "green", type: "VARBINARY", mode: "REQUIRED"} ]}, {records: 100, types: [ {name: "blue", type: "INT", mode: "REQUIRED"}, {name: "red", type: "BIGINT", mode: "REQUIRED"}, - {name: "green", type: "INT", mode: "REQUIRED"} + {name: "green", type: "VARBINARY", mode: "REQUIRED"} ]} ] }, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/test/resources/sender/hash_exchange2.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/sender/hash_exchange2.json b/exec/java-exec/src/test/resources/sender/hash_exchange2.json new file mode 100644 index 0000000..844dea7 --- /dev/null +++ b/exec/java-exec/src/test/resources/sender/hash_exchange2.json @@ -0,0 +1,47 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + {pop : "parquet-scan", + @id : 1, + entries : [ { + path : "/Users/sphillips/tpc-h/supplier" + } ], + storageengine : { + type : "parquet", + dfsName : "file:///" + }, + ref : "_MAP", + fragmentPointer : 0 + }, + { + @id:2, + child: 1, + pop:"project", + exprs: [ + { ref: "suppkey", expr:"_MAP.S_SUPPKEY"} + ] + }, + { + @id: 3, + child: 2, + pop: "hash-to-random-exchange", + expr: "hash(suppkey)" + }, + { + @id: 4, + child: 3, + pop: "union-exchange" + }, + { + @id: 5, + child: 4, + pop: "screen" + } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/test/resources/sender/ordered_exchange.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/sender/ordered_exchange.json b/exec/java-exec/src/test/resources/sender/ordered_exchange.json new file mode 100644 index 0000000..1b3d41e --- /dev/null +++ b/exec/java-exec/src/test/resources/sender/ordered_exchange.json @@ -0,0 +1,74 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"mock-scan", + url: "http://apache.org", + entries:[ + {records: 1000, types: [ + {name: "green", type: "VARBINARY", mode: "REQUIRED"} + ]}, + {records: 5000, types: [ + {name: "green", type: "VARBINARY", mode: "REQUIRED"} + ]}, + {records: 10000, types: [ + {name: "green", type: "VARBINARY", mode: "REQUIRED"} + ]}, + {records: 15000, types: [ + {name: "green", type: "INT", mode: "REQUIRED"} + ]} + ] + }, + { + @id:2, + child: 1, + pop:"project", + exprs: [ + { ref: "col1", expr:"randomBigInt(5)"}, + { ref: "col2", expr:"randomFloat8(1000)"} + ] + }, + { + @id: 3, + child: 2, + pop: "ordered-partition-exchange", + orderings: [ + {expr: "col1", order: "ASC"}, + {expr: "col2", order: "DESC"} + ], + ref: "partition" + }, + { + @id: 4, + child: 3, + pop: "union-exchange" + }, + { + @id:5, + child: 4, + pop:"sort", + orderings: [ + {expr: "partition"}, + {expr: "col1"}, + {expr: "col2", order: "DESC"} + ] + }, + { + @id:6, + child: 5, + pop:"selection-vector-remover" + }, + { + @id: 7, + child: 6, + pop: "screen" + } + ] +} \ No newline at end of file
