Repository: incubator-drill
Updated Branches:
  refs/heads/master 01bf8496b -> c8a08c3e7


DRILL-847: Handle different schemas for Merging receiver.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/602f8173
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/602f8173
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/602f8173

Branch: refs/heads/master
Commit: 602f8173ee9852bb386b5b1751b919396e63387c
Parents: 01bf849
Author: Jinfeng Ni <[email protected]>
Authored: Tue May 27 11:07:36 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Wed May 28 13:15:52 2014 -0700

----------------------------------------------------------------------
 .../impl/mergereceiver/MergingRecordBatch.java  | 29 ++++++++++++++++++++
 .../drill/exec/record/RecordBatchLoader.java    | 14 ++++++++++
 .../drill/exec/record/VectorContainer.java      | 26 +++++++++++++++++-
 3 files changed, 68 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/602f8173/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index e3f466a..9101202 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -61,6 +61,9 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
 import org.eigenbase.rel.RelFieldCollation.Direction;
 
+import parquet.Preconditions;
+
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.sun.codemodel.JArray;
 import com.sun.codemodel.JClass;
@@ -199,6 +202,18 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
         ++i;
       }
 
+      // Canonicalize each incoming batch, so that vectors are alphabetically 
sorted based on SchemaPath.
+      for (RecordBatchLoader loader : batchLoaders) {
+        loader.canonicalize();
+      }
+
+      // Ensure all the incoming batches have the identical schema.
+      if (!isSameSchemaAmongBatches(batchLoaders)) {
+        logger.error("Incoming batches for merging receiver have diffferent 
schemas!");
+        context.fail(new SchemaChangeException("Incoming batches for merging 
receiver have diffferent schemas!"));
+        return IterOutcome.STOP;
+      }
+
       // create the outgoing schema and vector container, and allocate the 
initial batch
       SchemaBuilder bldr = 
BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
       int vectorCount = 0;
@@ -397,6 +412,20 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
     return WritableBatch.get(this);
   }
 
+  private boolean isSameSchemaAmongBatches(RecordBatchLoader[] batchLoaders) {
+    Preconditions.checkArgument(batchLoaders.length > 0, "0 batch is not 
allowed!");
+
+    BatchSchema schema = batchLoaders[0].getSchema();
+
+    for (int i = 1; i < batchLoaders.length; i++) {
+      if (!schema.equals(batchLoaders[i].getSchema())) {
+        logger.error("Schemas are different. Schema 1 : " + schema + ", Schema 
2: " + batchLoaders[i].getSchema() );
+        return false;
+      }
+    }
+    return true;
+  }
+
   private void allocateOutgoing() {
     for (VectorAllocator allocator : allocators) {
       allocator.alloc(DEFAULT_ALLOC_RECORD_COUNT);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/602f8173/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 10d959f..e32fda9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -158,4 +158,18 @@ public class RecordBatchLoader implements 
VectorAccessible, Iterable<VectorWrapp
     container.clear();
   }
 
+  public void canonicalize() {
+    //logger.debug( "RecordBatchLoader : before schema " + schema);
+    container = VectorContainer.canonicalize(container);
+
+    // rebuild the schema.
+    SchemaBuilder b = BatchSchema.newBuilder();
+    for(VectorWrapper<?> v : container){
+      b.addField(v.getField());
+    }
+    b.setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
+    this.schema = b.build();
+
+    //logger.debug( "RecordBatchLoader : after schema " + schema);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/602f8173/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index dd62c67..3c67466 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -18,6 +18,9 @@
 package org.apache.drill.exec.record;
 
 import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 
@@ -85,6 +88,25 @@ public class VectorContainer extends AbstractMapVector 
implements Iterable<Vecto
     return vc;
   }
 
+  public static VectorContainer canonicalize(VectorContainer original) {
+    VectorContainer vc = new VectorContainer();
+
+    List<VectorWrapper<?>> canonicalWrappers = new 
ArrayList<VectorWrapper<?>>(original.wrappers);
+
+    // Sort list of VectorWrapper alphabetically based on SchemaPath.
+    Collections.sort(canonicalWrappers, new Comparator<VectorWrapper<?>>() {
+      public int compare(VectorWrapper<?> v1, VectorWrapper<?> v2) {
+        return 
v1.getField().getPath().toExpr().compareTo(v2.getField().getPath().toExpr());
+      }
+    });
+
+    for (VectorWrapper<?> w : canonicalWrappers) {
+      vc.add(w.getValueVector());
+    }
+    return vc;
+  }
+
+
   private void cloneAndTransfer(VectorWrapper<?> wrapper) {
     wrappers.add(wrapper.cloneAndTransfer());
   }
@@ -160,7 +182,7 @@ public class VectorContainer extends AbstractMapVector 
implements Iterable<Vecto
           clazz.getCanonicalName(), va.getVectorClass().getCanonicalName()));
     }
 
-    return (VectorWrapper<?>) va.getChildWrapper(fieldIds);
+    return va.getChildWrapper(fieldIds);
 
   }
 
@@ -212,4 +234,6 @@ public class VectorContainer extends AbstractMapVector 
implements Iterable<Vecto
   public int getNumberOfColumns() {
     return this.wrappers.size();
   }
+
+
 }

Reply via email to