Reduce the likelihood of phantom schema changes in distributed query plans.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/98bc9e19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/98bc9e19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/98bc9e19

Branch: refs/heads/master
Commit: 98bc9e19c153ac6f70ec58fbe37fcb2abc9de3f7
Parents: a8bfbf1
Author: Jacques Nadeau <[email protected]>
Authored: Thu Sep 5 21:17:58 2013 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Thu Sep 5 22:39:55 2013 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/physical/impl/ScanBatch.java  |  2 +-
 .../drill/exec/physical/impl/aggregate/AggBatch.java    |  2 +-
 .../apache/drill/exec/physical/impl/sort/SortBatch.java |  4 ++--
 .../apache/drill/exec/record/AbstractRecordBatch.java   |  2 +-
 .../drill/exec/record/AbstractSingleRecordBatch.java    |  2 +-
 .../apache/drill/exec/record/HyperVectorWrapper.java    |  4 ++--
 .../apache/drill/exec/record/SimpleVectorWrapper.java   |  2 +-
 .../org/apache/drill/exec/record/VectorContainer.java   | 12 ++++++++----
 .../org/apache/drill/exec/record/VectorWrapper.java     |  2 +-
 .../drill/exec/record/selection/SelectionVector4.java   |  2 ++
 .../apache/drill/exec/vector/BaseDataValueVector.java   | 12 +++++++++---
 .../exec/store/parquet/ParquetRecordReaderTest.java     |  2 +-
 12 files changed, 30 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index ae043ec..a02e5f7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -156,7 +156,7 @@ public class ScanBatch implements RecordBatch {
     @Override
     public void removeAllFields() {
       for(VectorWrapper<?> vw : container){
-        vw.release();
+        vw.clear();
       }
       container.clear();
       fieldVectorMap.clear();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggBatch.java
index 97e66f9..806239b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/AggBatch.java
@@ -90,7 +90,7 @@ public class AggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
       logger.debug("Aggregator response {}, records {}", out, 
aggregator.getOutputCount());
       switch(out){
       case CLEANUP_AND_RETURN:
-        container.clear();
+        container.zeroVectors();
         done = true;
         return aggregator.getOutcome();
       case RETURN_OUTCOME:

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 09ae687..61bcf34 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -75,7 +75,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
   @Override
   protected void cleanup() {
     super.cleanup();
-    container.clear();
+    container.zeroVectors();;
     sv4.clear();
   }
 
@@ -98,7 +98,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
           break outer;
         case NOT_YET:
         case STOP:
-          container.clear();
+          container.zeroVectors();
           return upstream;
         case OK_NEW_SCHEMA:
           // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 0d44368..aba023d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -42,7 +42,7 @@ public abstract class AbstractRecordBatch<T extends 
PhysicalOperator> implements
 
   @Override
   public void kill() {
-    container.clear();
+    container.zeroVectors();
     killIncoming();
     cleanup();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index fb2fc3a..63c31a4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -29,7 +29,7 @@ public abstract class AbstractSingleRecordBatch<T extends 
PhysicalOperator> exte
     case NONE:
     case NOT_YET:
     case STOP:
-      container.clear();
+      container.zeroVectors();
       return upstream;
     case OK_NEW_SCHEMA:
       try{

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
index 5bb6208..5d07849 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
@@ -50,14 +50,14 @@ public class HyperVectorWrapper<T extends ValueVector> 
implements VectorWrapper<
   }
 
   @Override
-  public void release() {
+  public void clear() {
     if(!releasable) return;
     for(T x : vectors){
       x.clear();  
     }
     
   }
-  
+
   @Override
   @SuppressWarnings("unchecked")
   public VectorWrapper<T> cloneAndTransfer() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
index 62ca8a4..7ff4b53 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
@@ -47,7 +47,7 @@ public class SimpleVectorWrapper<T extends ValueVector> 
implements VectorWrapper
   }
 
   @Override
-  public void release() {
+  public void clear() {
     v.clear();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 7c1e0ad..14c3a8d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -93,7 +93,7 @@ public class VectorContainer implements 
Iterable<VectorWrapper<?>> {
     for (Iterator<VectorWrapper<?>> iter = wrappers.iterator(); 
iter.hasNext();) {
       VectorWrapper<?> w = iter.next();
       if (!w.isHyper() && v == w.getValueVector()) {
-        w.release();
+        w.clear();
         iter.remove();
         return;
       }
@@ -148,11 +148,15 @@ public class VectorContainer implements 
Iterable<VectorWrapper<?>> {
   public void clear() {
     // TODO: figure out a better approach for this.
     // we don't clear schema because we want empty batches to carry previous 
schema to avoid extra schema update for no
-    // data.
+    // data.  
     // schema = null;
+    zeroVectors();
+    wrappers.clear();
+  }
+  
+  public void zeroVectors(){
     for (VectorWrapper<?> w : wrappers) {
-      w.release();
+      w.clear();
     }
-    wrappers.clear();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
index 1c5308e..5188cc3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
@@ -10,6 +10,6 @@ public interface VectorWrapper<T extends ValueVector> {
   public T getValueVector();
   public T[] getValueVectors();
   public boolean isHyper();
-  public void release();
+  public void clear();
   public VectorWrapper<T> cloneAndTransfer();
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index ebfb9e4..4b6c2c4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -92,6 +92,8 @@ public class SelectionVector4 {
   }
   
   public void clear(){
+    start = 0;
+    length = 0;
     this.vector.clear();
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index f41dcd2..b82b14e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -32,14 +32,20 @@ public abstract class BaseDataValueVector extends 
BaseValueVector{
   
   @Override
   public ByteBuf[] getBuffers(){
-    ByteBuf[] out = new ByteBuf[]{data};
-    data.readerIndex(0);
-    data.retain();
+    ByteBuf[] out;
+    if(valueCount == 0){
+      out = new ByteBuf[0];
+    }else{
+      out = new ByteBuf[]{data};
+      data.readerIndex(0);
+      data.retain();
+    }
     clear();
     return out;
   }
   
   public int getBufferSize() {
+    if(valueCount == 0) return 0;
     return data.writerIndex();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/98bc9e19/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index e66ec49..267848e 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -209,7 +209,7 @@ public class ParquetRecordReaderTest {
       }
 
       for(VectorWrapper<?> vw : batchLoader){
-        vw.release();
+        vw.clear();
       }
       result.release();
       

Reply via email to