DRILL-800: Partitioner is dropping records that can't fit in the available 
space of ValueVectors in OutgoingRecordBatch.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/6dd3ff9c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/6dd3ff9c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/6dd3ff9c

Branch: refs/heads/master
Commit: 6dd3ff9cb3fbfe4748cfe6f64d4ba1d4ecc404f2
Parents: ef28054
Author: vkorukanti <[email protected]>
Authored: Thu May 22 18:26:38 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Wed May 28 09:13:58 2014 -0700

----------------------------------------------------------------------
 .../partitionsender/OutgoingRecordBatch.java    | 20 ++---
 .../PartitionSenderRootExec.java                | 36 ++++-----
 .../impl/partitionsender/Partitioner.java       |  5 --
 .../partitionsender/PartitionerSV2Template.java | 60 ---------------
 .../partitionsender/PartitionerSV4Template.java | 61 ----------------
 .../partitionsender/PartitionerTemplate.java    | 77 +++++++++++++++++---
 6 files changed, 89 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6dd3ff9c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index 646f1a0..c86da8c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -71,10 +71,9 @@ public class OutgoingRecordBatch implements VectorAccessible 
{
   private volatile boolean ok = true;
   private BatchSchema outSchema;
   private int recordCount;
-  private int recordCapacity;
-  private static int DEFAULT_ALLOC_SIZE = 20000;
-  private static int DEFAULT_VARIABLE_WIDTH_SIZE = 2048;
   private OperatorStats stats;
+  private static final int DEFAULT_RECORD_BATCH_SIZE = 20000;
+  private static final int DEFAULT_VARIABLE_WIDTH_SIZE = 200;
 
   public OutgoingRecordBatch(OperatorStats stats, SendingAccountor sendCount, 
HashPartitionSender operator, DataTunnel tunnel, RecordBatch incoming,
                              FragmentContext context, BufferAllocator 
allocator, int oppositeMinorFragmentId) {
@@ -89,9 +88,8 @@ public class OutgoingRecordBatch implements VectorAccessible {
   }
 
   public void flushIfNecessary() {
-    if (recordCount == recordCapacity) logger.debug("Flush is necesary:  Count 
is " + recordCount + ", capacity is " + recordCapacity);
     try {
-      if (recordCount == recordCapacity){
+      if (recordCount == DEFAULT_RECORD_BATCH_SIZE) {
         flush();
         stats.addLongStat(PartitionSenderStats.BATCHES_SENT, 1l);
         stats.addLongStat(PartitionSenderStats.RECORDS_SENT, recordCount);
@@ -159,8 +157,8 @@ public class OutgoingRecordBatch implements 
VectorAccessible {
     recordCount = 0;
     vectorContainer.zeroVectors();
     for (VectorWrapper<?> v : vectorContainer) {
-//      logger.debug("Reallocating vv to capacity " + DEFAULT_ALLOC_SIZE + " 
after flush.");
-      VectorAllocator.getAllocator(v.getValueVector(), 
DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_ALLOC_SIZE);
+//      logger.debug("Reallocating vv to capacity " + 
DEFAULT_RECORD_BATCH_SIZE + " after flush.");
+      VectorAllocator.getAllocator(v.getValueVector(), 
DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_RECORD_BATCH_SIZE);
     }
     if (!ok) { throw new SchemaChangeException("Flush ended NOT OK!"); }
     return true;
@@ -173,7 +171,6 @@ public class OutgoingRecordBatch implements 
VectorAccessible {
   public void initializeBatch() {
     isLast = false;
     vectorContainer.clear();
-    recordCapacity = DEFAULT_ALLOC_SIZE;
 
     SchemaBuilder bldr = 
BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
     for (VectorWrapper<?> v : incoming) {
@@ -183,12 +180,12 @@ public class OutgoingRecordBatch implements 
VectorAccessible {
 
       // allocate a new value vector
       ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), 
allocator);
-      VectorAllocator.getAllocator(outgoingVector, 100).alloc(recordCapacity);
+      VectorAllocator.getAllocator(outgoingVector, 
DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_RECORD_BATCH_SIZE);
       vectorContainer.add(outgoingVector);
-//      logger.debug("Reallocating to cap " + recordCapacity + " because of 
newly init'd vector : " + v.getValueVector());
+//      logger.debug("Reallocating to cap " + DEFAULT_RECORD_BATCH_SIZE + " 
because of newly init'd vector : " + v.getValueVector());
     }
     outSchema = bldr.build();
-//    logger.debug("Initialized OutgoingRecordBatch.  RecordCount: " + 
recordCount + ", cap: " + recordCapacity + " Schema: " + outSchema);
+//    logger.debug("Initialized OutgoingRecordBatch.  RecordCount: " + 
recordCount + ", cap: " + DEFAULT_RECORD_BATCH_SIZE + " Schema: " + outSchema);
   }
 
   /**
@@ -198,7 +195,6 @@ public class OutgoingRecordBatch implements 
VectorAccessible {
   public void resetBatch() {
     isLast = false;
     recordCount = 0;
-    recordCapacity = 0;
     for (VectorWrapper<?> v : vectorContainer){
       v.getValueVector().clear();
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6dd3ff9c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 7893be8..d0eaf9a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -187,20 +187,7 @@ public class PartitionSenderRootExec implements RootExec {
 
     boolean hyper = false;
 
-    switch(incoming.getSchema().getSelectionVectorMode()){
-    case NONE:
-      cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION, 
context.getFunctionRegistry());
-      break;
-    case TWO_BYTE:
-      cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION_SV2, 
context.getFunctionRegistry());
-      break;
-    case FOUR_BYTE:
-      cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION_SV4, 
context.getFunctionRegistry());
-      hyper = true;
-      break;
-    default:
-      throw new UnsupportedOperationException();
-    }
+    cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION, 
context.getFunctionRegistry());
 
     final LogicalExpression materializedExpr = 
ExpressionTreeMaterializer.materialize(expr, incoming, collector, 
context.getFunctionRegistry());
     if (collector.hasErrors()) {
@@ -285,10 +272,12 @@ public class PartitionSenderRootExec implements RootExec {
                          .component(bucket))
                          .component(JExpr.lit(fieldId))))
                          .invoke("copyFromSafe")
-                         .arg(inIndex)
-                         .arg(((JExpression) 
outgoingBatches.component(bucket)).invoke("getRecordCount"))
-                         .arg(incomingVV).not())._then().add(((JExpression) 
outgoingBatches.component(bucket)).invoke("flush"))
-                         ._return();
+                           .arg(inIndex)
+                           .arg(((JExpression) 
outgoingBatches.component(bucket)).invoke("getRecordCount"))
+                           .arg(incomingVV).not())
+                         ._then()
+                           .add(((JExpression) 
outgoingBatches.component(bucket)).invoke("flush"))
+                           ._return(JExpr.lit(false));
       } else {
         // the following block generates calls to copyFrom(); e.g.:
         // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex,
@@ -301,10 +290,12 @@ public class PartitionSenderRootExec implements RootExec {
                          .component(bucket))
                          .component(JExpr.lit(fieldId))))
                          .invoke("copyFromSafe")
-                         .arg(inIndex)
-                         .arg(((JExpression) 
outgoingBatches.component(bucket)).invoke("getRecordCount"))
-                         
.arg(incomingVV.component(inIndex.shrz(JExpr.lit(16)))).not())._then().add(((JExpression)
 outgoingBatches.component(bucket)).invoke("flush"))
-                         ._return();
+                           .arg(inIndex)
+                           .arg(((JExpression) 
outgoingBatches.component(bucket)).invoke("getRecordCount"))
+                           
.arg(incomingVV.component(inIndex.shrz(JExpr.lit(16)))).not())
+                         ._then()
+                           .add(((JExpression) 
outgoingBatches.component(bucket)).invoke("flush"))
+                           ._return(JExpr.lit(false));
 
       }
       ++fieldId;
@@ -312,6 +303,7 @@ public class PartitionSenderRootExec implements RootExec {
     // generate the OutgoingRecordBatch helper invocations
     cg.getEvalBlock().add(((JExpression) 
outgoingBatches.component(bucket)).invoke("incRecordCount"));
     cg.getEvalBlock().add(((JExpression) 
outgoingBatches.component(bucket)).invoke("flushIfNecessary"));
+    cg.getEvalBlock()._return(JExpr.lit(true));
     try {
       // compile and setup generated code
 //      partitioner = context.getImplementationClassMultipleOutput(cg);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6dd3ff9c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 3ffead0..7d3998b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -31,9 +31,4 @@ public interface Partitioner {
   public abstract void partitionBatch(RecordBatch incoming);
 
   public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new 
TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class);
-
-  public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION_SV2 = 
new TemplateClassDefinition<>(Partitioner.class, PartitionerSV2Template.class);
-
-  public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION_SV4 = 
new TemplateClassDefinition<>(Partitioner.class, PartitionerSV4Template.class);
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6dd3ff9c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java
deleted file mode 100644
index 981055a..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.partitionsender;
-
-import javax.inject.Named;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-
-public abstract class PartitionerSV2Template implements Partitioner {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PartitionerSV2Template.class);
-
-  private SelectionVector2 sv2;
-
-  public PartitionerSV2Template() throws SchemaChangeException {
-  }
-
-  @Override
-  public final void setup(FragmentContext context,
-                          RecordBatch incoming,
-                          OutgoingRecordBatch[] outgoing) throws 
SchemaChangeException {
-
-    this.sv2 = incoming.getSelectionVector2();
-
-    doSetup(context, incoming, outgoing);
-
-  }
-
-  @Override
-  public void partitionBatch(RecordBatch incoming) {
-
-    for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) {
-      // for each record
-      doEval(sv2.getIndex(recordId), 0);
-    }
-
-  }
-
-  public abstract void doSetup(@Named("context") FragmentContext context, 
@Named("incoming") RecordBatch incoming, @Named("outgoing") 
OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
-  public abstract void doEval(@Named("inIndex") int inIndex, 
@Named("outIndex") int outIndex);
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6dd3ff9c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java
deleted file mode 100644
index 2e00f9b..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.partitionsender;
-
-import javax.inject.Named;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-
-public abstract class PartitionerSV4Template implements Partitioner {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PartitionerSV4Template.class);
-
-  private SelectionVector4 sv4;
-
-  public PartitionerSV4Template() throws SchemaChangeException {
-  }
-
-  @Override
-  public final void setup(FragmentContext context,
-                          RecordBatch incoming,
-                          OutgoingRecordBatch[] outgoing) throws 
SchemaChangeException {
-
-    this.sv4 = incoming.getSelectionVector4();
-
-    doSetup(context, incoming, outgoing);
-
-  }
-
-  @Override
-  public void partitionBatch(RecordBatch incoming) {
-
-    for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) {
-      // for each record
-      doEval(sv4.get(recordId), 0);
-    }
-
-  }
-
-  public abstract void doSetup(@Named("context") FragmentContext context, 
@Named("incoming") RecordBatch incoming, @Named("outgoing") 
OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
-  public abstract void doEval(@Named("inIndex") int inIndex, 
@Named("outIndex") int outIndex);
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6dd3ff9c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 1b0aad2..fe62b73 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -19,13 +19,24 @@ package org.apache.drill.exec.physical.impl.partitionsender;
 
 import javax.inject.Named;
 
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
 
 public abstract class PartitionerTemplate implements Partitioner {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PartitionerTemplate.class);
 
+  private SelectionVector2 sv2;
+  private SelectionVector4 sv4;
+
+  private static final String REWRITE_MSG = "Failed to write the record {} in 
available space. Attempting to rewrite.";
+  private static final String RECORD_TOO_BIG_MSG = "Record {} is too big to 
fit into the allocated memory of ValueVector.";
+
   public PartitionerTemplate() throws SchemaChangeException {
   }
 
@@ -36,24 +47,70 @@ public abstract class PartitionerTemplate implements 
Partitioner {
 
     doSetup(context, incoming, outgoing);
 
+    SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
+    switch(svMode){
+      case FOUR_BYTE:
+        this.sv4 = incoming.getSelectionVector4();
+        break;
+
+      case TWO_BYTE:
+        this.sv2 = incoming.getSelectionVector2();
+        break;
+
+      case NONE:
+        break;
+
+      default:
+        throw new UnsupportedOperationException("Unknown selection vector 
mode: " + svMode.toString());
+    }
   }
 
   @Override
   public void partitionBatch(RecordBatch incoming) {
+    SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
 
-    for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) {
-      // for each record
+    // Keeping the for loop inside the case to avoid case evaluation for each 
record.
+    switch(svMode) {
+      case NONE:
+        for (int recordId = 0; recordId < incoming.getRecordCount(); 
++recordId) {
+          if (!doEval(recordId)) {
+            logger.trace(REWRITE_MSG, recordId);
+            if (!doEval(recordId)) {
+              logger.debug(RECORD_TOO_BIG_MSG, recordId);
+            }
+          }
+        }
+        break;
 
-      // TODO: if attempting to insert too large of a value into a vector:
-      //         - send the batch
-      //         - reallocate (at least the size of the current value) and try 
again
-      doEval(recordId, 0);
-    }
+      case TWO_BYTE:
+        for (int recordId = 0; recordId < incoming.getRecordCount(); 
++recordId) {
+          int svIndex = sv2.getIndex(recordId);
+          if (!doEval(svIndex)) {
+            logger.trace(REWRITE_MSG, recordId);
+            if (!doEval(svIndex)) {
+              logger.debug(RECORD_TOO_BIG_MSG, recordId);
+            }
+          }
+        }
+        break;
+
+      case FOUR_BYTE:
+        for (int recordId = 0; recordId < incoming.getRecordCount(); 
++recordId) {
+          int svIndex = sv4.get(recordId);
+          if (!doEval(svIndex)) {
+            logger.trace(REWRITE_MSG, recordId);
+            if (!doEval(svIndex)) {
+              logger.debug(RECORD_TOO_BIG_MSG, recordId);
+            }
+          }
+        }
+        break;
 
+      default:
+        throw new UnsupportedOperationException("Unknown selection vector 
mode: " + svMode.toString());
+    }
   }
 
   public abstract void doSetup(@Named("context") FragmentContext context, 
@Named("incoming") RecordBatch incoming, @Named("outgoing") 
OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
-  public abstract void doEval(@Named("inIndex") int inIndex, 
@Named("outIndex") int outIndex);
-
-
+  public abstract boolean doEval(@Named("inIndex") int inIndex);
 }

Reply via email to