http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
index 43ed7e4..cb5e20e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.sort;
 
 import javax.inject.Named;
 
+import com.google.common.base.Preconditions;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
@@ -33,9 +34,10 @@ public abstract class SortTemplate implements Sorter, 
IndexedSortable{
   private SelectionVector4 vector4;
   
   
-  public void setup(FragmentContext context, RecordBatch hyperBatch) throws 
SchemaChangeException{
+  public void setup(FragmentContext context, SelectionVector4 vector4, 
VectorContainer hyperBatch) throws SchemaChangeException{
     // we pass in the local hyperBatch since that is where we'll be reading 
data.
-    vector4 = hyperBatch.getSelectionVector4();
+    Preconditions.checkArgument(vector4 != null);
+    this.vector4 = vector4;
     doSetup(context, hyperBatch, null);
   }
   
@@ -59,7 +61,7 @@ public abstract class SortTemplate implements Sorter, 
IndexedSortable{
     return doEval(sv1, sv2);
   }
 
-  public abstract void doSetup(@Named("context") FragmentContext context, 
@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch 
outgoing);
+  public abstract void doSetup(@Named("context") FragmentContext context, 
@Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch 
outgoing);
   public abstract int doEval(@Named("leftIndex") int leftIndex, 
@Named("rightIndex") int rightIndex);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
index 2099a7c..dcb159c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java
@@ -25,7 +25,7 @@ import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
 public interface Sorter {
-  public void setup(FragmentContext context, RecordBatch hyperBatch) throws 
SchemaChangeException;
+  public void setup(FragmentContext context, SelectionVector4 vector4, 
VectorContainer hyperBatch) throws SchemaChangeException;
   public void sort(SelectionVector4 vector4, VectorContainer container);
   
   public static TemplateClassDefinition<Sorter> TEMPLATE_DEFINITION = new 
TemplateClassDefinition<Sorter>(Sorter.class, SortTemplate.class);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 30a3d5a..8262691 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -159,7 +159,7 @@ public class SimpleParallelizer {
       // TODO: right now we'll just assume that each task is cost 1 so we'll 
set the breadth at the lesser of the number
       // of tasks or the maximum width of the fragment.
       if (diskCost < width) {
-        width = (int) diskCost;
+//        width = (int) diskCost;
       }
 
       width = Math.min(width, maxWidthPerEndpoint*allNodes.size());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 27d3de1..ec2d246 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -81,7 +81,7 @@ public abstract class AbstractRecordBatch<T extends 
PhysicalOperator> implements
 
   @Override
   public TypedFieldId getValueVectorId(SchemaPath path) {
-    return container.getValueVector(path);
+    return container.getValueVectorId(path);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 8b6d51c..46c747c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -31,7 +31,7 @@ import org.apache.drill.exec.vector.ValueVector;
  * A key thing to know is that the Iterator provided by record batch must 
align with the rank positions of the field ids
  * provided utilizing getValueVectorId();
  */
-public interface RecordBatch extends Iterable<VectorWrapper<?>> {
+public interface RecordBatch extends VectorAccessible, 
Iterable<VectorWrapper<?>>{
 
   /**
    * Describes the outcome of a RecordBatch being incremented forward.
@@ -92,6 +92,7 @@ public interface RecordBatch extends 
Iterable<VectorWrapper<?>> {
    *         TypedFieldId
    */
   public abstract TypedFieldId getValueVectorId(SchemaPath path);
+  @Override
   public abstract VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> 
clazz);
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index c6d73ea..4057c58 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -34,7 +34,7 @@ import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.collect.Maps;
 
-public class RecordBatchLoader implements Iterable<VectorWrapper<?>>{
+public class RecordBatchLoader implements VectorAccessible, 
Iterable<VectorWrapper<?>>{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RecordBatchLoader.class);
 
   private VectorContainer container = new VectorContainer();
@@ -115,14 +115,14 @@ public class RecordBatchLoader implements 
Iterable<VectorWrapper<?>>{
   }
 
   public TypedFieldId getValueVectorId(SchemaPath path) {
-    return container.getValueVector(path);
+    return container.getValueVectorId(path);
   }
   
   
   
 //  
 //  @SuppressWarnings("unchecked")
-//  public <T extends ValueVector> T getValueVector(int fieldId, Class<?> 
clazz) {
+//  public <T extends ValueVector> T getValueVectorId(int fieldId, Class<?> 
clazz) {
 //    ValueVector v = container.get(fieldId);
 //    assert v != null;
 //    if (v.getClass() != clazz){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
new file mode 100644
index 0000000..a8100b2
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.vector.ValueVector;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: sphillips
+ * Date: 9/30/13
+ * Time: 1:40 PM
+ * To change this template use File | Settings | File Templates.
+ */
+public interface VectorAccessible extends Iterable<VectorWrapper<?>> {
+  public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz);
+  public TypedFieldId getValueVectorId(SchemaPath path);
+  public BatchSchema getSchema();
+  public int getRecordCount();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 939245b..91aa70b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -28,12 +28,13 @@ import org.apache.drill.exec.vector.ValueVector;
 import com.beust.jcommander.internal.Lists;
 import com.google.common.base.Preconditions;
 
-public class VectorContainer implements Iterable<VectorWrapper<?>> {
+public class VectorContainer implements Iterable<VectorWrapper<?>>, 
VectorAccessible {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(VectorContainer.class);
 
   private final List<VectorWrapper<?>> wrappers = Lists.newArrayList();
   private final List<VectorWrapper<?>> oldWrappers = Lists.newArrayList();
   private BatchSchema schema;
+  private int recordCount = -1;
 
   public VectorContainer() {
   }
@@ -119,7 +120,7 @@ public class VectorContainer implements 
Iterable<VectorWrapper<?>> {
     throw new IllegalStateException("You attempted to remove a vector that 
didn't exist.");
   }
 
-  public TypedFieldId getValueVector(SchemaPath path) {
+  public TypedFieldId getValueVectorId(SchemaPath path) {
     for (int i = 0; i < wrappers.size(); i++) {
       VectorWrapper<?> va = wrappers.get(i);
       if (va.getField().matches(path))
@@ -129,8 +130,10 @@ public class VectorContainer implements 
Iterable<VectorWrapper<?>> {
     return null;
   }
 
+
+  @Override
   @SuppressWarnings("unchecked")
-  public <T extends ValueVector> VectorWrapper<T> getValueAccessorById(int 
fieldId, Class<?> clazz) {
+  public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
     VectorWrapper<?> va = wrappers.get(fieldId);
     assert va != null;
     if (va.getVectorClass() != clazz) {
@@ -139,7 +142,7 @@ public class VectorContainer implements 
Iterable<VectorWrapper<?>> {
           clazz.getCanonicalName(), va.getVectorClass().getCanonicalName()));
       return null;
     }
-    return (VectorWrapper<T>) va;
+    return (VectorWrapper) va;
   }
 
   public BatchSchema getSchema() {
@@ -170,6 +173,16 @@ public class VectorContainer implements 
Iterable<VectorWrapper<?>> {
     zeroVectors();
     wrappers.clear();
   }
+
+  public void setRecordCount(int recordCount) {
+    this.recordCount = recordCount;
+  }
+
+  @Override
+  public int getRecordCount() {
+    Preconditions.checkState(recordCount != -1, "Record count not set for this 
vector container");
+    return recordCount;
+  }
   
   public void zeroVectors(){
     for (VectorWrapper<?> w : wrappers) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
new file mode 100644
index 0000000..063c00a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import com.beust.jcommander.internal.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.vector.ValueVector;
+
+import java.util.List;
+
+public class BatchPrinter {
+  public static void printHyperBatch(VectorAccessible batch) {
+    List<String> columns = Lists.newArrayList();
+    List<ValueVector> vectors = Lists.newArrayList();
+    int numBatches = 0;
+    for (VectorWrapper vw : batch) {
+      columns.add(vw.getValueVectors()[0].getField().getName());
+      numBatches = vw.getValueVectors().length;
+    }
+    int width = columns.size();
+      for (int i = 0; i < numBatches; i++) {
+        int rows = 
batch.iterator().next().getValueVectors()[i].getMetadata().getValueCount();
+        for (int j = 0; j < rows; j++) {
+          for (VectorWrapper vw : batch) {
+            Object o = vw.getValueVectors()[i].getAccessor().getObject(j);
+            if (o instanceof byte[]) {
+              String value = new String((byte[]) o);
+              System.out.printf("| %-15s",value.length() <= 15 ? value : 
value.substring(0, 14));
+            } else {
+              String value = o.toString();
+              System.out.printf("| %-15s",value.length() <= 15 ? value : 
value.substring(0,14));
+            }
+          }
+          System.out.printf("|\n");
+        }
+      }
+      System.out.printf("|\n");
+  }
+  public static void printBatch(VectorAccessible batch) {
+    List<String> columns = Lists.newArrayList();
+    List<ValueVector> vectors = Lists.newArrayList();
+    for (VectorWrapper vw : batch) {
+      columns.add(vw.getValueVector().getField().getName());
+      vectors.add(vw.getValueVector());
+    }
+    int width = columns.size();
+    int rows = vectors.get(0).getMetadata().getValueCount();
+    for (int row = 0; row < rows; row++) {
+      if (row%50 == 0) {
+        System.out.println(StringUtils.repeat("-", width * 17 + 1));
+        for (String column : columns) {
+          System.out.printf("| %-15s", width <= 15 ? column : 
column.substring(0, 14));
+        }
+        System.out.printf("|\n");
+        System.out.println(StringUtils.repeat("-", width*17 + 1));
+      }
+      for (ValueVector vv : vectors) {
+        Object o = vv.getAccessor().getObject(row);
+        if (o instanceof byte[]) {
+          String value = new String((byte[]) o);
+          System.out.printf("| %-15s",value.length() <= 15 ? value : 
value.substring(0, 14));
+        } else {
+          String value = o.toString();
+          System.out.printf("| %-15s",value.length() <= 15 ? value : 
value.substring(0,14));
+        }
+      }
+      System.out.printf("|\n");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java 
b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
new file mode 100644
index 0000000..f089932
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import com.beust.jcommander.internal.Lists;
+import com.hazelcast.core.MultiMap;
+import com.hazelcast.nio.FastByteArrayInputStream;
+import com.hazelcast.nio.FastByteArrayOutputStream;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.vector.*;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataOutput;
+import java.util.List;
+
+public class TestVectorCache {
+
+  @Test
+  public void testVectorCache() throws Exception {
+    List<ValueVector> vectorList = Lists.newArrayList();
+    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+    DrillConfig config = DrillConfig.create();
+    Drillbit bit = new Drillbit(config, serviceSet);
+    bit.run();
+    DrillbitContext context = bit.getContext();
+    HazelCache cache = new HazelCache(config);
+    cache.run();
+
+    MaterializedField intField = MaterializedField.create(new 
SchemaPath("int", ExpressionPosition.UNKNOWN), 
Types.required(TypeProtos.MinorType.INT));
+    IntVector intVector = (IntVector)TypeHelper.getNewVector(intField, 
context.getAllocator());
+    MaterializedField binField = MaterializedField.create(new 
SchemaPath("binary", ExpressionPosition.UNKNOWN), 
Types.required(TypeProtos.MinorType.VARBINARY));
+    VarBinaryVector binVector = 
(VarBinaryVector)TypeHelper.getNewVector(binField, context.getAllocator());
+    AllocationHelper.allocate(intVector, 4, 4);
+    AllocationHelper.allocate(binVector, 4, 5);
+    vectorList.add(intVector);
+    vectorList.add(binVector);
+
+    intVector.getMutator().set(0, 0); binVector.getMutator().set(0, 
"ZERO".getBytes());
+    intVector.getMutator().set(1, 1); binVector.getMutator().set(1, 
"ONE".getBytes());
+    intVector.getMutator().set(2, 2); binVector.getMutator().set(2, 
"TWO".getBytes());
+    intVector.getMutator().set(3, 3); binVector.getMutator().set(3, 
"THREE".getBytes());
+    intVector.getMutator().setValueCount(4);
+    binVector.getMutator().setValueCount(4);
+
+    VectorWrap wrap = new VectorWrap(vectorList);
+    /*
+    FastByteArrayOutputStream out = new FastByteArrayOutputStream();
+    wrap.writeData(out);
+    FastByteArrayInputStream in = new FastByteArrayInputStream(out.getBytes());
+    VectorWrap newWrap = new VectorWrap();
+    newWrap.readData(in);
+    */
+    MultiMap<String, VectorWrap> mmap = cache.getMultiMap("testMap");
+    mmap.put("vectors", wrap);
+    VectorWrap newWrap = mmap.get("vectors").iterator().next();
+
+    List<ValueVector> vectors = newWrap.get();
+    for (ValueVector vv : vectors) {
+      int values = vv.getAccessor().getValueCount();
+      for (int i = 0; i < values; i++) {
+        Object o = vv.getAccessor().getObject(i);
+        if (o instanceof byte[]) {
+          System.out.println(new String((byte[])o));
+        } else {
+          System.out.println(o);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java
new file mode 100644
index 0000000..d12633e
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/GeneratorFunctions.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.fn.impl;
+
+import org.apache.drill.common.expression.ArgumentValidators;
+import org.apache.drill.common.expression.CallProvider;
+import org.apache.drill.common.expression.FunctionDefinition;
+import org.apache.drill.common.expression.OutputTypeDeterminer;
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.holders.*;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.Random;
+
+public class GeneratorFunctions {
+
+  public static final Random random = new Random(1234L);
+  public static final FunctionDefinition RANDOM_BIG_INT = 
FunctionDefinition.simple("randomBigInt", new 
ArgumentValidators.NumericTypeAllowed(1,2, true),
+          OutputTypeDeterminer.FIXED_BIGINT, "randomBigInt");
+  public static final FunctionDefinition RANDOM_FLOAT8 = 
FunctionDefinition.simple("randomFloat8", new 
ArgumentValidators.NumericTypeAllowed(1,2, true),
+          OutputTypeDeterminer.FIXED_FLOAT8, "randomFloat8");
+
+  public static class Provider implements CallProvider {
+
+    @Override
+    public FunctionDefinition[] getFunctionDefintions() {
+      return new FunctionDefinition[] { RANDOM_BIG_INT,
+                                        RANDOM_FLOAT8 };
+    }
+
+  }
+
+  @FunctionTemplate(name = "randomBigInt", scope = FunctionScope.SIMPLE, nulls 
= FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class RandomBigIntGauss implements DrillSimpleFunc {
+
+    @Param BigIntHolder range;
+    @Output BigIntHolder out;
+
+    public void setup(RecordBatch incoming) {
+    }
+
+    public void eval() {
+      out.value = 
(long)(org.apache.drill.exec.fn.impl.GeneratorFunctions.random.nextGaussian() * 
range.value);
+    }
+  }
+
+  @FunctionTemplate(name = "randomBigInt", scope = FunctionScope.SIMPLE, nulls 
= FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class RandomBigInt implements DrillSimpleFunc {
+
+    @Param BigIntHolder min;
+    @Param BigIntHolder max;
+    @Output BigIntHolder out;
+
+    public void setup(RecordBatch incoming) {
+    }
+
+    public void eval() {
+      out.value = 
(long)(org.apache.drill.exec.fn.impl.GeneratorFunctions.random.nextFloat() * 
(max.value - min.value) + min.value);
+    }
+  }
+
+  @FunctionTemplate(name = "randomFloat8", scope = FunctionScope.SIMPLE, nulls 
= FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class RandomFloat8Gauss implements DrillSimpleFunc {
+
+    @Param BigIntHolder range;
+    @Output
+    Float8Holder out;
+
+    public void setup(RecordBatch incoming) {
+    }
+
+    public void eval() {
+      out.value = 
org.apache.drill.exec.fn.impl.GeneratorFunctions.random.nextGaussian() * 
range.value;
+    }
+  }
+
+  @FunctionTemplate(name = "randomFloat8", scope = FunctionScope.SIMPLE, nulls 
= FunctionTemplate.NullHandling.NULL_IF_NULL)
+  public static class RandomFloat8 implements DrillSimpleFunc {
+
+    @Param BigIntHolder min;
+    @Param BigIntHolder max;
+    @Output Float8Holder out;
+
+    public void setup(RecordBatch incoming) {
+    }
+
+    public void eval() {
+      out.value = 
org.apache.drill.exec.fn.impl.GeneratorFunctions.random.nextFloat() * 
(max.value - min.value) + min.value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java
index bbe1c18..0cc09b4 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java
@@ -38,6 +38,8 @@ import com.google.common.io.Files;
 public class TestHashToRandomExchange extends PopUnitTestBase {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestHashToRandomExchange.class);
 
+  //Todo reenable this test once fix for has partition assignments is included
+  @Ignore
   @Test
   public void twoBitTwoExchangeTwoEntryRun() throws Exception {
     RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java
new file mode 100644
index 0000000..850a40a
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/orderedpartitioner/TestOrderedPartitionExchange.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.orderedpartitioner;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.commons.math.stat.descriptive.moment.Mean;
+import org.apache.commons.math.stat.descriptive.moment.StandardDeviation;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.util.BatchPrinter;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.Float8Vector;
+import org.apache.drill.exec.vector.IntVector;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestOrderedPartitionExchange extends PopUnitTestBase {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestOrderedPartitionExchange.class);
+
+  @Test
+  public void twoBitTwoExchangeTwoEntryRun() throws Exception {
+    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+    try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet);
+        Drillbit bit2 = new Drillbit(CONFIG, serviceSet);
+        DrillClient client = new DrillClient(CONFIG, 
serviceSet.getCoordinator());) {
+
+      bit1.run();
+      bit2.run();
+      client.connect();
+      List<QueryResultBatch> results = 
client.runQuery(UserProtos.QueryType.PHYSICAL,
+          
Files.toString(FileUtils.getResourceAsFile("/sender/ordered_exchange.json"),
+              Charsets.UTF_8));
+      int count = 0;
+      List<Integer> partitionRecordCounts = Lists.newArrayList();
+      for(QueryResultBatch b : results) {
+        if (b.getData() != null) {
+          int rows = b.getHeader().getRowCount();
+          count += rows;
+          RecordBatchLoader loader = new RecordBatchLoader(new 
BootStrapContext(DrillConfig.create()).getAllocator());
+          loader.load(b.getHeader().getDef(), b.getData());
+          BigIntVector vv1 = 
(BigIntVector)loader.getValueAccessorById(loader.getValueVectorId(
+                  new SchemaPath("col1", 
ExpressionPosition.UNKNOWN)).getFieldId(), BigIntVector.class).getValueVector();
+          Float8Vector vv2 = 
(Float8Vector)loader.getValueAccessorById(loader.getValueVectorId(
+                  new SchemaPath("col2", 
ExpressionPosition.UNKNOWN)).getFieldId(), Float8Vector.class).getValueVector();
+          IntVector pVector = 
(IntVector)loader.getValueAccessorById(loader.getValueVectorId(
+                  new SchemaPath("partition", 
ExpressionPosition.UNKNOWN)).getFieldId(), IntVector.class).getValueVector();
+          long previous1 = Long.MIN_VALUE;
+          double previous2 = Double.MIN_VALUE;
+          int partPrevious = -1;
+          long current1 = Long.MIN_VALUE;
+          double current2 = Double.MIN_VALUE;
+          int partCurrent = -1;
+          int partitionRecordCount = 0;
+          for (int i = 0; i < rows; i++) {
+            previous1 = current1;
+            previous2 = current2;
+            partPrevious = partCurrent;
+            current1 = vv1.getAccessor().get(i);
+            current2 = vv2.getAccessor().get(i);
+            partCurrent = pVector.getAccessor().get(i);
+            Assert.assertTrue(current1 >= previous1);
+            if (current1 == previous1) {
+              Assert.assertTrue(current2 <= previous2);
+            }
+            if (partCurrent == partPrevious || partPrevious == -1) {
+              partitionRecordCount++;
+            } else {
+              partitionRecordCounts.add(partitionRecordCount);
+              partitionRecordCount = 0;
+            }
+          }
+          partitionRecordCounts.add(partitionRecordCount);
+        }
+      }
+      double[] values = new double[partitionRecordCounts.size()];
+      int i = 0;
+      for (Integer rc : partitionRecordCounts) {
+        values[i++] = rc.doubleValue();
+      }
+      StandardDeviation stdDev = new StandardDeviation();
+      Mean mean = new Mean();
+      double std = stdDev.evaluate(values);
+      double m = mean.evaluate(values);
+      System.out.println("mean: " + m + " std dev: " + std);
+      Assert.assertTrue(std < 0.1 * m);
+      assertEquals(31000, count);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java 
b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
index 294a4f0..78f7e43 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
@@ -45,8 +45,8 @@ public abstract class PopUnitTestBase {
   protected static DrillConfig CONFIG;
 
   // Set a timeout unless we're debugging.
-  @Rule public TestRule globalTimeout = IS_DEBUG ? new TestName() : new 
Timeout(10000);
-  
+  @Rule public TestRule globalTimeout = IS_DEBUG ? new TestName() : new 
Timeout(25000);
+
   @BeforeClass
   public static void setup() {
     CONFIG = DrillConfig.create();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/test/resources/sender/hash_exchange.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/sender/hash_exchange.json 
b/exec/java-exec/src/test/resources/sender/hash_exchange.json
index 3454361..d8f9579 100644
--- a/exec/java-exec/src/test/resources/sender/hash_exchange.json
+++ b/exec/java-exec/src/test/resources/sender/hash_exchange.json
@@ -15,12 +15,12 @@
               {records: 100, types: [
                 {name: "blue", type: "INT", mode: "REQUIRED"},
                 {name: "red", type: "BIGINT", mode: "REQUIRED"},
-                {name: "green", type: "INT", mode: "REQUIRED"}
+                {name: "green", type: "VARBINARY", mode: "REQUIRED"}
               ]},
               {records: 100, types: [
                 {name: "blue", type: "INT", mode: "REQUIRED"},
                 {name: "red", type: "BIGINT", mode: "REQUIRED"},
-                {name: "green", type: "INT", mode: "REQUIRED"}
+                {name: "green", type: "VARBINARY", mode: "REQUIRED"}
               ]}
             ]
         },

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/test/resources/sender/hash_exchange2.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/sender/hash_exchange2.json 
b/exec/java-exec/src/test/resources/sender/hash_exchange2.json
new file mode 100644
index 0000000..844dea7
--- /dev/null
+++ b/exec/java-exec/src/test/resources/sender/hash_exchange2.json
@@ -0,0 +1,47 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+    graph:[
+    {pop : "parquet-scan",
+        @id : 1,
+        entries : [ {
+          path : "/Users/sphillips/tpc-h/supplier"
+        } ],
+        storageengine : {
+          type : "parquet",
+          dfsName : "file:///"
+        },
+        ref : "_MAP",
+        fragmentPointer : 0
+      },
+      {
+                  @id:2,
+                  child: 1,
+                  pop:"project",
+                  exprs: [
+                    { ref: "suppkey", expr:"_MAP.S_SUPPKEY"}
+                  ]
+              },
+         {
+            @id: 3,
+            child: 2,
+            pop: "hash-to-random-exchange",
+            expr: "hash(suppkey)"
+        },
+        {
+            @id: 4,
+            child: 3,
+            pop: "union-exchange"
+        },
+        {
+            @id: 5,
+            child: 4,
+            pop: "screen"
+        }
+    ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5ca503c1/exec/java-exec/src/test/resources/sender/ordered_exchange.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/sender/ordered_exchange.json 
b/exec/java-exec/src/test/resources/sender/ordered_exchange.json
new file mode 100644
index 0000000..1b3d41e
--- /dev/null
+++ b/exec/java-exec/src/test/resources/sender/ordered_exchange.json
@@ -0,0 +1,74 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+    graph:[
+    {
+                @id:1,
+                pop:"mock-scan",
+                url: "http://apache.org";,
+                entries:[
+                  {records: 1000, types: [
+                    {name: "green", type: "VARBINARY", mode: "REQUIRED"}
+                  ]},
+                  {records: 5000, types: [
+                    {name: "green", type: "VARBINARY", mode: "REQUIRED"}
+                  ]},
+                  {records: 10000, types: [
+                    {name: "green", type: "VARBINARY", mode: "REQUIRED"}
+                  ]},
+                  {records: 15000, types: [
+                    {name: "green", type: "INT", mode: "REQUIRED"}
+                  ]}
+                ]
+            },
+      {
+                  @id:2,
+                  child: 1,
+                  pop:"project",
+                  exprs: [
+                    { ref: "col1", expr:"randomBigInt(5)"},
+                    { ref: "col2", expr:"randomFloat8(1000)"}
+                  ]
+              },
+         {
+            @id: 3,
+            child: 2,
+            pop: "ordered-partition-exchange",
+            orderings: [
+              {expr: "col1", order: "ASC"},
+              {expr: "col2", order: "DESC"}
+            ],
+            ref: "partition"
+        },
+        {
+            @id: 4,
+            child: 3,
+            pop: "union-exchange"
+        },
+        {
+            @id:5,
+            child: 4,
+            pop:"sort",
+            orderings: [
+              {expr: "partition"},
+              {expr: "col1"},
+              {expr: "col2", order: "DESC"}
+            ]
+        },
+        {
+            @id:6,
+            child: 5,
+            pop:"selection-vector-remover"
+        },
+        {
+            @id: 7,
+            child: 6,
+            pop: "screen"
+        }
+    ]
+}
\ No newline at end of file

Reply via email to