Repository: incubator-drill
Updated Branches:
  refs/heads/master c8a08c3e7 -> e1e5ea0ed


DRILL-854: PartitionSender generated code is too large


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/58cf129b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/58cf129b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/58cf129b

Branch: refs/heads/master
Commit: 58cf129bb801c40410b22f300150185fde857ce7
Parents: c8a08c3
Author: Steven Phillips <[email protected]>
Authored: Wed May 28 22:12:42 2014 -0700
Committer: Steven Phillips <[email protected]>
Committed: Wed May 28 22:19:47 2014 -0700

----------------------------------------------------------------------
 .../partitionsender/OutgoingRecordBatch.java    | 262 ------------------
 .../PartitionSenderRootExec.java                | 252 +++++------------
 .../impl/partitionsender/Partitioner.java       |  18 +-
 .../partitionsender/PartitionerTemplate.java    | 272 +++++++++++++++++--
 .../impl/partitionsender/StatusHandler.java     |  64 +++++
 .../impl/svremover/RemovingRecordBatch.java     |  46 +---
 .../physical/impl/xsort/ExternalSortBatch.java  |   3 +-
 .../org/apache/drill/exec/vector/CopyUtil.java  |  65 +++++
 8 files changed, 469 insertions(+), 513 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/58cf129b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
deleted file mode 100644
index c86da8c..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.partitionsender;
-
-import io.netty.buffer.ByteBuf;
-
-import java.util.Iterator;
-
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorStats;
-import org.apache.drill.exec.physical.config.HashPartitionSender;
-import org.apache.drill.exec.physical.impl.SendingAccountor;
-import org.apache.drill.exec.proto.ExecProtos;
-import org.apache.drill.exec.proto.GeneralRPCProtos;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.FragmentWritableBatch;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.SchemaBuilder;
-import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.data.DataTunnel;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.allocator.VectorAllocator;
-import org.apache.drill.exec.work.ErrorHelper;
-
-import com.google.common.base.Preconditions;
-
-/**
- * OutgoingRecordBatch is a holder of value vectors which are to be sent to 
another host.  Thus,
- * next() will never be called on this object.  When a record batch is ready 
to send (e.g. nearing size
- * limit or schema change), call flush() to send the batch.
- */
-public class OutgoingRecordBatch implements VectorAccessible {
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(OutgoingRecordBatch.class);
-
-  private final DataTunnel tunnel;
-  private final HashPartitionSender operator;
-  private final RecordBatch incoming;
-  private final FragmentContext context;
-  private final BufferAllocator allocator;
-  private final VectorContainer vectorContainer = new VectorContainer();
-  private final SendingAccountor sendCount;
-  private final int oppositeMinorFragmentId;
-
-  private boolean isLast = false;
-  private volatile boolean ok = true;
-  private BatchSchema outSchema;
-  private int recordCount;
-  private OperatorStats stats;
-  private static final int DEFAULT_RECORD_BATCH_SIZE = 20000;
-  private static final int DEFAULT_VARIABLE_WIDTH_SIZE = 200;
-
-  public OutgoingRecordBatch(OperatorStats stats, SendingAccountor sendCount, 
HashPartitionSender operator, DataTunnel tunnel, RecordBatch incoming,
-                             FragmentContext context, BufferAllocator 
allocator, int oppositeMinorFragmentId) {
-    this.incoming = incoming;
-    this.context = context;
-    this.allocator = allocator;
-    this.operator = operator;
-    this.tunnel = tunnel;
-    this.sendCount = sendCount;
-    this.stats = stats;
-    this.oppositeMinorFragmentId = oppositeMinorFragmentId;
-  }
-
-  public void flushIfNecessary() {
-    try {
-      if (recordCount == DEFAULT_RECORD_BATCH_SIZE) {
-        flush();
-        stats.addLongStat(PartitionSenderStats.BATCHES_SENT, 1l);
-        stats.addLongStat(PartitionSenderStats.RECORDS_SENT, recordCount);
-      }
-    } catch (SchemaChangeException e) {
-      incoming.kill();
-      logger.error("Error flushing outgoing batches", e);
-      context.fail(e);
-    }
-  }
-
-  public void incRecordCount() {
-    ++recordCount;
-  }
-
-  /**
-   * Send the record batch to the target node, then reset the value vectors
-   *
-   * @return true if a flush was needed; otherwise false
-   * @throws SchemaChangeException
-   */
-  public boolean flush() throws SchemaChangeException {
-    final ExecProtos.FragmentHandle handle = context.getHandle();
-
-    if (recordCount != 0) {
-
-      for(VectorWrapper<?> w : vectorContainer){
-        w.getValueVector().getMutator().setValueCount(recordCount);
-      }
-
-
-//      BatchPrinter.printBatch(vectorContainer);
-
-      FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast,
-                                                                      
handle.getQueryId(),
-                                                                      
handle.getMajorFragmentId(),
-                                                                      
handle.getMinorFragmentId(),
-                                                                      
operator.getOppositeMajorFragmentId(),
-                                                                      
oppositeMinorFragmentId,
-                                                                      
getWritableBatch());
-
-      tunnel.sendRecordBatch(statusHandler, writableBatch);
-      this.sendCount.increment();
-    } else {
-      logger.debug("Flush requested on an empty outgoing record batch" + 
(isLast ? " (last batch)" : ""));
-      if (isLast) {
-        // send final (empty) batch
-        FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast,
-                                                                        
handle.getQueryId(),
-                                                                        
handle.getMajorFragmentId(),
-                                                                        
handle.getMinorFragmentId(),
-                                                                        
operator.getOppositeMajorFragmentId(),
-                                                                        
oppositeMinorFragmentId,
-                                                                        
getWritableBatch());
-        tunnel.sendRecordBatch(statusHandler, writableBatch);
-        this.sendCount.increment();
-        vectorContainer.clear();
-        return true;
-      }
-    }
-
-    // reset values and reallocate the buffer for each value vector based on 
the incoming batch.
-    // NOTE: the value vector is directly referenced by generated code; 
therefore references
-    // must remain valid.
-    recordCount = 0;
-    vectorContainer.zeroVectors();
-    for (VectorWrapper<?> v : vectorContainer) {
-//      logger.debug("Reallocating vv to capacity " + 
DEFAULT_RECORD_BATCH_SIZE + " after flush.");
-      VectorAllocator.getAllocator(v.getValueVector(), 
DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_RECORD_BATCH_SIZE);
-    }
-    if (!ok) { throw new SchemaChangeException("Flush ended NOT OK!"); }
-    return true;
-  }
-
-
-  /**
-   * Create a new output schema and allocate space for value vectors based on 
the incoming record batch.
-   */
-  public void initializeBatch() {
-    isLast = false;
-    vectorContainer.clear();
-
-    SchemaBuilder bldr = 
BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
-    for (VectorWrapper<?> v : incoming) {
-
-      // add field to the output schema
-      bldr.addField(v.getField());
-
-      // allocate a new value vector
-      ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), 
allocator);
-      VectorAllocator.getAllocator(outgoingVector, 
DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_RECORD_BATCH_SIZE);
-      vectorContainer.add(outgoingVector);
-//      logger.debug("Reallocating to cap " + DEFAULT_RECORD_BATCH_SIZE + " 
because of newly init'd vector : " + v.getValueVector());
-    }
-    outSchema = bldr.build();
-//    logger.debug("Initialized OutgoingRecordBatch.  RecordCount: " + 
recordCount + ", cap: " + DEFAULT_RECORD_BATCH_SIZE + " Schema: " + outSchema);
-  }
-
-  /**
-   * Free any existing value vectors, create new output schema, and allocate 
value vectors based
-   * on the incoming record batch.
-   */
-  public void resetBatch() {
-    isLast = false;
-    recordCount = 0;
-    for (VectorWrapper<?> v : vectorContainer){
-      v.getValueVector().clear();
-    }
-  }
-
-  public void setIsLast() {
-    isLast = true;
-  }
-
-  @Override
-  public BatchSchema getSchema() {
-    Preconditions.checkNotNull(outSchema);
-    return outSchema;
-  }
-
-  @Override
-  public int getRecordCount() {
-    return recordCount;
-  }
-
-  @Override
-  public TypedFieldId getValueVectorId(SchemaPath path) {
-    return vectorContainer.getValueVectorId(path);
-  }
-
-  @Override
-  public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... 
fieldIds) {
-    return vectorContainer.getValueAccessorById(clazz, fieldIds);
-  }
-
-  @Override
-  public Iterator<VectorWrapper<?>> iterator() {
-    return vectorContainer.iterator();
-  }
-
-  public WritableBatch getWritableBatch() {
-    return WritableBatch.getBatchNoHVWrap(recordCount, this, false);
-  }
-
-
-  private StatusHandler statusHandler = new StatusHandler();
-  private class StatusHandler extends 
BaseRpcOutcomeListener<GeneralRPCProtos.Ack> {
-    RpcException ex;
-
-    @Override
-    public void success(Ack value, ByteBuf buffer) {
-      sendCount.decrement();
-      super.success(value, buffer);
-    }
-
-    @Override
-    public void failed(RpcException ex) {
-      sendCount.decrement();
-      logger.error("Failure while sending data to user.", ex);
-      ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while 
sending fragment to client.", ex, logger);
-      ok = false;
-      this.ex = ex;
-    }
-
-  }
-
-  public void clear(){
-    vectorContainer.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/58cf129b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index d0eaf9a..74a3c90 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -18,7 +18,7 @@
 package org.apache.drill.exec.physical.impl.partitionsender;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
@@ -28,7 +28,6 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
@@ -36,34 +35,32 @@ import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
 import org.apache.drill.exec.physical.impl.RootExec;
 import org.apache.drill.exec.physical.impl.SendingAccountor;
-import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
-import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.rpc.data.DataTunnel;
 
-import com.sun.codemodel.JArray;
-import com.sun.codemodel.JClass;
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JExpression;
-import com.sun.codemodel.JMod;
 import com.sun.codemodel.JType;
-import com.sun.codemodel.JVar;
+import org.apache.drill.exec.vector.CopyUtil;
+
 
 public class PartitionSenderRootExec implements RootExec {
 
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class);
   private RecordBatch incoming;
   private HashPartitionSender operator;
-  private OutgoingRecordBatch[] outgoing;
   private Partitioner partitioner;
   private FragmentContext context;
   private OperatorContext oContext;
   private boolean ok = true;
   private final SendingAccountor sendCount = new SendingAccountor();
   private final OperatorStats stats;
+  private final int outGoingBatchCount;
+  private final HashPartitionSender popConfig;
 
 
   public PartitionSenderRootExec(FragmentContext context,
@@ -75,18 +72,8 @@ public class PartitionSenderRootExec implements RootExec {
     this.context = context;
     this.oContext = new OperatorContext(operator, context);
     this.stats = oContext.getStats();
-    this.outgoing = new OutgoingRecordBatch[operator.getDestinations().size()];
-    int fieldId = 0;
-    for (CoordinationProtos.DrillbitEndpoint endpoint : 
operator.getDestinations()) {
-      FragmentHandle opposite = 
context.getHandle().toBuilder().setMajorFragmentId(operator.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build();
-      outgoing[fieldId] = new OutgoingRecordBatch(stats, sendCount, operator,
-                                                    
context.getDataTunnel(endpoint, opposite),
-                                                    incoming,
-                                                    context,
-                                                    oContext.getAllocator(),
-                                                    fieldId);
-      fieldId++;
-    }
+    this.outGoingBatchCount = operator.getDestinations().size();
+    this.popConfig = operator;
   }
 
   @Override
@@ -105,11 +92,12 @@ public class PartitionSenderRootExec implements RootExec {
       case NONE:
         try {
           // send any pending batches
-          for (OutgoingRecordBatch batch : outgoing) {
-            batch.setIsLast();
-            batch.flush();
+          if(partitioner != null) {
+            partitioner.flushOutgoingBatches(true, false);
+          } else {
+            sendEmptyBatch();
           }
-        } catch (SchemaChangeException e) {
+        } catch (IOException e) {
           incoming.kill();
           logger.error("Error while creating partitioning sender or flushing 
outgoing batches", e);
           context.fail(e);
@@ -117,8 +105,8 @@ public class PartitionSenderRootExec implements RootExec {
         return false;
 
       case STOP:
-        for (OutgoingRecordBatch batch : outgoing) {
-          batch.clear();
+        if (partitioner != null) {
+          partitioner.clear();
         }
         return false;
 
@@ -127,22 +115,31 @@ public class PartitionSenderRootExec implements RootExec {
         try {
           // send all existing batches
           if (partitioner != null) {
-            flushOutgoingBatches(false, true);
-          }
-          for (OutgoingRecordBatch b : outgoing) {
-            b.initializeBatch();
+            partitioner.flushOutgoingBatches(false, true);
+            partitioner.clear();
           }
-          // update OutgoingRecordBatch's schema and generate partitioning code
+          // update DeprecatedOutgoingRecordBatch's schema and generate 
partitioning code
           createPartitioner();
+        } catch (IOException e) {
+          incoming.kill();
+          logger.error("Error while flushing outgoing batches", e);
+          context.fail(e);
+          return false;
         } catch (SchemaChangeException e) {
           incoming.kill();
-          logger.error("Error while creating partitioning sender or flushing 
outgoing batches", e);
+          logger.error("Error while setting up partitioner", e);
           context.fail(e);
           return false;
         }
       case OK:
         stats.batchReceived(0, incoming.getRecordCount(), newSchema);
-        partitioner.partitionBatch(incoming);
+        try {
+          partitioner.partitionBatch(incoming);
+        } catch (IOException e) {
+          incoming.kill();
+          context.fail(e);
+          return false;
+        }
         for (VectorWrapper v : incoming) {
           v.clear();
         }
@@ -153,31 +150,6 @@ public class PartitionSenderRootExec implements RootExec {
     }
   }
 
-
-
-  private void generatePartitionFunction() throws SchemaChangeException {
-
-    LogicalExpression filterExpression = operator.getExpr();
-    final ErrorCollector collector = new ErrorCollectorImpl();
-    final ClassGenerator<Partitioner> cg = 
CodeGenerator.get(Partitioner.TEMPLATE_DEFINITION, 
context.getFunctionRegistry()).getRoot();
-
-    final LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(filterExpression, incoming, 
collector,context.getFunctionRegistry());
-    if(collector.hasErrors()){
-      throw new SchemaChangeException(String.format("Failure while trying to 
materialize incoming schema.  Errors:\n %s.", collector.toErrorString()));
-    }
-
-    cg.addExpr(new ReturnValueExpression(expr));
-
-    try {
-      Partitioner p = context.getImplementationClass(cg);
-      p.setup(context, incoming, outgoing);
-    } catch (ClassTransformationException | IOException e) {
-      throw new SchemaChangeException("Failure while attempting to load 
generated class", e);
-    }
-
-
-  }
-
   private void createPartitioner() throws SchemaChangeException {
 
     // set up partitioning function
@@ -188,6 +160,7 @@ public class PartitionSenderRootExec implements RootExec {
     boolean hyper = false;
 
     cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION, 
context.getFunctionRegistry());
+    ClassGenerator<Partitioner> cgInner = 
cg.getInnerGenerator("OutgoingRecordBatch");
 
     final LogicalExpression materializedExpr = 
ExpressionTreeMaterializer.materialize(expr, incoming, collector, 
context.getFunctionRegistry());
     if (collector.hasErrors()) {
@@ -197,153 +170,56 @@ public class PartitionSenderRootExec implements RootExec 
{
     }
 
     // generate code to copy from an incoming value vector to the destination 
partition's outgoing value vector
-    JExpression inIndex = JExpr.direct("inIndex");
     JExpression bucket = JExpr.direct("bucket");
-    JType outgoingVectorArrayType = 
cg.getModel().ref(ValueVector.class).array().array();
-    JType outgoingBatchArrayType = 
cg.getModel().ref(OutgoingRecordBatch.class).array();
 
     // generate evaluate expression to determine the hash
     ClassGenerator.HoldingContainer exprHolder = cg.addExpr(materializedExpr);
-    cg.getEvalBlock().decl(JType.parse(cg.getModel(), "int"), "bucket", 
exprHolder.getValue().mod(JExpr.lit(outgoing.length)));
-    cg.getEvalBlock().assign(JExpr.ref("bucket"), 
cg.getModel().ref(Math.class).staticInvoke("abs").arg(bucket));
-    // declare and assign the array of outgoing record batches
-    JVar outgoingBatches = cg.clazz.field(JMod.NONE,
-        outgoingBatchArrayType,
-        "outgoingBatches");
-    cg.getSetupBlock().assign(outgoingBatches, JExpr.direct("outgoing"));
-
-    // declare a two-dimensional array of value vectors; batch is first 
dimension, ValueVector is the second
-    JVar outgoingVectors = cg.clazz.field(JMod.NONE,
-                                          outgoingVectorArrayType,
-                                          "outgoingVectors");
-
-    // create 2d array and build initialization list.  For example:
-    //     outgoingVectors = new ValueVector[][] {
-    //                              new ValueVector[] {vv1, vv2},
-    //                              new ValueVector[] {vv3, vv4}
-    //                       });
-    JArray outgoingVectorInit = 
JExpr.newArray(cg.getModel().ref(ValueVector.class).array());
-
-    int fieldId = 0;
-    int batchId = 0;
-    for (OutgoingRecordBatch batch : outgoing) {
-
-      JArray outgoingVectorInitBatch = 
JExpr.newArray(cg.getModel().ref(ValueVector.class));
-      for (VectorWrapper<?> vv : batch) {
-        // declare outgoing value vector and assign it to the array
-        JVar outVV = cg.declareVectorValueSetupAndMember("outgoing[" + batchId 
+ "]",
-                                                         new 
TypedFieldId(vv.getField().getType(),
-                                                                          
false,
-                                                                          
fieldId));
-        // add vv to initialization list (e.g. { vv1, vv2, vv3 } )
-        outgoingVectorInitBatch.add(outVV);
-        ++fieldId;
-      }
-
-      // add VV array to initialization list (e.g. new ValueVector[] { ... })
-      outgoingVectorInit.add(outgoingVectorInitBatch);
-      ++batchId;
-      fieldId = 0;
-    }
-
-    // generate outgoing value vector 2d array initialization list.
-    cg.getSetupBlock().assign(outgoingVectors, outgoingVectorInit);
+    cg.getEvalBlock().decl(JType.parse(cg.getModel(), "int"), "bucket", 
exprHolder.getValue().mod(JExpr.lit(outGoingBatchCount)));
+    
cg.getEvalBlock()._return(cg.getModel().ref(Math.class).staticInvoke("abs").arg(bucket));
 
-    for (VectorWrapper<?> vvIn : incoming) {
-      // declare incoming value vectors
-      JVar incomingVV = cg.declareVectorValueSetupAndMember("incoming", new 
TypedFieldId(vvIn.getField().getType(),
-          vvIn.isHyper(),
-          fieldId));
+    CopyUtil.generateCopies(cgInner, incoming, 
incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE);
 
-      // generate the copyFrom() invocation with explicit cast to the 
appropriate type
-      Class<?> vvType = 
TypeHelper.getValueVectorClass(vvIn.getField().getType().getMinorType(),
-                                                       
vvIn.getField().getType().getMode());
-      JClass vvClass = cg.getModel().ref(vvType);
-
-      if (!hyper) {
-        // the following block generates calls to copyFrom(); e.g.:
-        // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex,
-        //                                                     
outgoingBatches[bucket].getRecordCount(),
-        //                                                     vv1);
-        cg.getEvalBlock()._if(
-          ((JExpression) JExpr.cast(vvClass,
-                ((JExpression)
-                       outgoingVectors
-                         .component(bucket))
-                         .component(JExpr.lit(fieldId))))
-                         .invoke("copyFromSafe")
-                           .arg(inIndex)
-                           .arg(((JExpression) 
outgoingBatches.component(bucket)).invoke("getRecordCount"))
-                           .arg(incomingVV).not())
-                         ._then()
-                           .add(((JExpression) 
outgoingBatches.component(bucket)).invoke("flush"))
-                           ._return(JExpr.lit(false));
-      } else {
-        // the following block generates calls to copyFrom(); e.g.:
-        // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex,
-        //                                                     
outgoingBatches[bucket].getRecordCount(),
-        //                                                     
vv1[((inIndex)>>> 16)]);
-        cg.getEvalBlock()._if(
-          ((JExpression) JExpr.cast(vvClass,
-                ((JExpression)
-                       outgoingVectors
-                         .component(bucket))
-                         .component(JExpr.lit(fieldId))))
-                         .invoke("copyFromSafe")
-                           .arg(inIndex)
-                           .arg(((JExpression) 
outgoingBatches.component(bucket)).invoke("getRecordCount"))
-                           
.arg(incomingVV.component(inIndex.shrz(JExpr.lit(16)))).not())
-                         ._then()
-                           .add(((JExpression) 
outgoingBatches.component(bucket)).invoke("flush"))
-                           ._return(JExpr.lit(false));
-
-      }
-      ++fieldId;
-    }
-    // generate the OutgoingRecordBatch helper invocations
-    cg.getEvalBlock().add(((JExpression) 
outgoingBatches.component(bucket)).invoke("incRecordCount"));
-    cg.getEvalBlock().add(((JExpression) 
outgoingBatches.component(bucket)).invoke("flushIfNecessary"));
-    cg.getEvalBlock()._return(JExpr.lit(true));
     try {
       // compile and setup generated code
 //      partitioner = context.getImplementationClassMultipleOutput(cg);
       partitioner = context.getImplementationClass(cg);
-      partitioner.setup(context, incoming, outgoing);
+      partitioner.setup(context, incoming, popConfig, stats, sendCount, 
oContext);
 
     } catch (ClassTransformationException | IOException e) {
       throw new SchemaChangeException("Failure while attempting to load 
generated class", e);
     }
   }
 
-  /**
-   * Flush each outgoing record batch, and optionally reset the state of each 
outgoing record
-   * batch (on schema change).  Note that the schema is updated based on 
incoming at the time
-   * this function is invoked.
-   *
-   * @param isLastBatch    true if this is the last incoming batch
-   * @param schemaChanged  true if the schema has changed
-   */
-  public void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) 
throws SchemaChangeException {
-    for (OutgoingRecordBatch batch : outgoing) {
-      logger.debug("Attempting to flush all outgoing batches");
-      if (isLastBatch)
-        batch.setIsLast();
-      batch.flush();
-      if (schemaChanged) {
-        batch.resetBatch();
-        batch.initializeBatch();
-      }
-    }
-  }
-
   public void stop() {
     logger.debug("Partition sender stopping.");
     ok = false;
-    for(OutgoingRecordBatch b : outgoing){
-      b.clear();
+    if (partitioner != null) {
+      partitioner.clear();
     }
     sendCount.waitForSendComplete();
     oContext.close();
     incoming.cleanup();
   }
+
+  public void sendEmptyBatch() {
+    FragmentHandle handle = context.getHandle();
+    int fieldId = 0;
+    VectorContainer container = new VectorContainer();
+    StatusHandler statusHandler = new StatusHandler(sendCount, context);
+    for (DrillbitEndpoint endpoint : popConfig.getDestinations()) {
+      FragmentHandle opposite = 
context.getHandle().toBuilder().setMajorFragmentId(popConfig.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build();
+      DataTunnel tunnel = context.getDataTunnel(endpoint, opposite);
+      FragmentWritableBatch writableBatch = new FragmentWritableBatch(true,
+              handle.getQueryId(),
+              handle.getMajorFragmentId(),
+              handle.getMinorFragmentId(),
+              operator.getOppositeMajorFragmentId(),
+              fieldId,
+              WritableBatch.getBatchNoHVWrap(0, container, false));
+      tunnel.sendRecordBatch(statusHandler, writableBatch);
+      this.sendCount.increment();
+      fieldId++;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/58cf129b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 7d3998b..8d6c19a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -20,15 +20,27 @@ package org.apache.drill.exec.physical.impl.partitionsender;
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.impl.SendingAccountor;
 import org.apache.drill.exec.record.RecordBatch;
 
+import java.io.IOException;
+
 public interface Partitioner {
 
   public abstract void setup(FragmentContext context,
-                             RecordBatch incoming,
-                             OutgoingRecordBatch[] outgoing) throws 
SchemaChangeException;
+                          RecordBatch incoming,
+                          HashPartitionSender popConfig,
+                          OperatorStats stats,
+                          SendingAccountor sendingAccountor,
+                          OperatorContext oContext) throws 
SchemaChangeException;
 
-  public abstract void partitionBatch(RecordBatch incoming);
+  public abstract void partitionBatch(RecordBatch incoming) throws IOException;
+  public abstract void flushOutgoingBatches(boolean isLastBatch, boolean 
schemaChanged) throws IOException;
+  public abstract void initialize();
+  public abstract void clear();
 
   public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new 
TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/58cf129b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index fe62b73..4a27262 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -19,20 +19,46 @@ package org.apache.drill.exec.physical.impl.partitionsender;
 
 import javax.inject.Named;
 
-import org.apache.drill.exec.ExecConstants;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.compile.sig.RuntimeOverridden;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.impl.SendingAccountor;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.record.*;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.util.BatchPrinter;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
+import org.apache.drill.exec.work.ErrorHelper;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
 
 public abstract class PartitionerTemplate implements Partitioner {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PartitionerTemplate.class);
 
   private SelectionVector2 sv2;
   private SelectionVector4 sv4;
+  private RecordBatch incoming;
+  private List<OutgoingRecordBatch> outgoingBatches = Lists.newArrayList();
 
   private static final String REWRITE_MSG = "Failed to write the record {} in 
available space. Attempting to rewrite.";
   private static final String RECORD_TOO_BIG_MSG = "Record {} is too big to 
fit into the allocated memory of ValueVector.";
@@ -40,12 +66,26 @@ public abstract class PartitionerTemplate implements 
Partitioner {
   public PartitionerTemplate() throws SchemaChangeException {
   }
 
-  @Override
   public final void setup(FragmentContext context,
                           RecordBatch incoming,
-                          OutgoingRecordBatch[] outgoing) throws 
SchemaChangeException {
+                          HashPartitionSender popConfig,
+                          OperatorStats stats,
+                          SendingAccountor sendingAccountor,
+                          OperatorContext oContext) throws 
SchemaChangeException {
+
+    this.incoming = incoming;
+    doSetup(context, incoming, null);
+
+    int fieldId = 0;
+    for (DrillbitEndpoint endpoint : popConfig.getDestinations()) {
+      FragmentHandle opposite = 
context.getHandle().toBuilder().setMajorFragmentId(popConfig.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build();
+      outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, 
popConfig, context.getDataTunnel(endpoint, opposite), context, 
oContext.getAllocator(), fieldId));
+      fieldId++;
+    }
 
-    doSetup(context, incoming, outgoing);
+    for (OutgoingRecordBatch outgoingRecordBatch : outgoingBatches) {
+      outgoingRecordBatch.initializeBatch();
+    }
 
     SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
     switch(svMode){
@@ -65,17 +105,41 @@ public abstract class PartitionerTemplate implements 
Partitioner {
     }
   }
 
+  /**
+   * Flush each outgoing record batch, and optionally reset the state of each 
outgoing record
+   * batch (on schema change).  Note that the schema is updated based on 
incoming at the time
+   * this function is invoked.
+   *
+   * @param isLastBatch    true if this is the last incoming batch
+   * @param schemaChanged  true if the schema has changed
+   */
+  public void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) 
throws IOException {
+    for (OutgoingRecordBatch batch : outgoingBatches) {
+      logger.debug("Attempting to flush all outgoing batches");
+      if (isLastBatch) {
+        batch.setIsLast();
+      }
+      batch.flush();
+      if (schemaChanged) {
+        batch.resetBatch();
+        batch.initializeBatch();
+      }
+    }
+  }
+
   @Override
-  public void partitionBatch(RecordBatch incoming) {
+  public void partitionBatch(RecordBatch incoming) throws IOException {
     SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
 
     // Keeping the for loop inside the case to avoid case evaluation for each 
record.
     switch(svMode) {
       case NONE:
         for (int recordId = 0; recordId < incoming.getRecordCount(); 
++recordId) {
-          if (!doEval(recordId)) {
+          OutgoingRecordBatch outgoingBatch = 
outgoingBatches.get(doEval(recordId));
+          if (!outgoingBatch.copy(recordId)) {
             logger.trace(REWRITE_MSG, recordId);
-            if (!doEval(recordId)) {
+            outgoingBatch.flush();
+            if (!outgoingBatch.copy(recordId)) {
               logger.debug(RECORD_TOO_BIG_MSG, recordId);
             }
           }
@@ -85,9 +149,10 @@ public abstract class PartitionerTemplate implements 
Partitioner {
       case TWO_BYTE:
         for (int recordId = 0; recordId < incoming.getRecordCount(); 
++recordId) {
           int svIndex = sv2.getIndex(recordId);
-          if (!doEval(svIndex)) {
-            logger.trace(REWRITE_MSG, recordId);
-            if (!doEval(svIndex)) {
+          OutgoingRecordBatch outgoingBatch = 
outgoingBatches.get(doEval(svIndex));
+          if (!outgoingBatch.copy(svIndex)) {
+            logger.trace(REWRITE_MSG, svIndex);
+            if (!outgoingBatch.copy(svIndex)) {
               logger.debug(RECORD_TOO_BIG_MSG, recordId);
             }
           }
@@ -97,9 +162,10 @@ public abstract class PartitionerTemplate implements 
Partitioner {
       case FOUR_BYTE:
         for (int recordId = 0; recordId < incoming.getRecordCount(); 
++recordId) {
           int svIndex = sv4.get(recordId);
-          if (!doEval(svIndex)) {
-            logger.trace(REWRITE_MSG, recordId);
-            if (!doEval(svIndex)) {
+          OutgoingRecordBatch outgoingBatch = 
outgoingBatches.get(doEval(svIndex));
+          if (!outgoingBatch.copy(svIndex)) {
+            logger.trace(REWRITE_MSG, svIndex);
+            if (!outgoingBatch.copy(svIndex)) {
               logger.debug(RECORD_TOO_BIG_MSG, recordId);
             }
           }
@@ -111,6 +177,180 @@ public abstract class PartitionerTemplate implements 
Partitioner {
     }
   }
 
+  @Override
+  public void clear() {
+    for (OutgoingRecordBatch outgoingRecordBatch : outgoingBatches) {
+      outgoingRecordBatch.clear();
+    }
+  }
+
   public abstract void doSetup(@Named("context") FragmentContext context, 
@Named("incoming") RecordBatch incoming, @Named("outgoing") 
OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
-  public abstract boolean doEval(@Named("inIndex") int inIndex);
+  public abstract int doEval(@Named("inIndex") int inIndex);
+
+  public class OutgoingRecordBatch implements VectorAccessible {
+
+    private final DataTunnel tunnel;
+    private final HashPartitionSender operator;
+    private final FragmentContext context;
+    private final BufferAllocator allocator;
+    private final VectorContainer vectorContainer = new VectorContainer();
+    private final SendingAccountor sendCount;
+    private final int oppositeMinorFragmentId;
+
+    private boolean isLast = false;
+    private BatchSchema outSchema;
+    private int recordCount;
+    private OperatorStats stats;
+    private static final int DEFAULT_RECORD_BATCH_SIZE = 20000;
+    private static final int DEFAULT_VARIABLE_WIDTH_SIZE = 200;
+
+    private StatusHandler statusHandler;
+
+    public OutgoingRecordBatch(OperatorStats stats, SendingAccountor 
sendCount, HashPartitionSender operator, DataTunnel tunnel,
+                               FragmentContext context, BufferAllocator 
allocator, int oppositeMinorFragmentId) {
+      this.context = context;
+      this.allocator = allocator;
+      this.operator = operator;
+      this.tunnel = tunnel;
+      this.sendCount = sendCount;
+      this.stats = stats;
+      this.oppositeMinorFragmentId = oppositeMinorFragmentId;
+      this.statusHandler = new StatusHandler(sendCount, context);
+    }
+
+    protected boolean copy(int inIndex) throws IOException {
+      if (doEval(inIndex, recordCount)) {
+        recordCount++;
+        if (recordCount == DEFAULT_RECORD_BATCH_SIZE) {
+          flush();
+        }
+        return true;
+      }
+      return false;
+    }
+
+    @RuntimeOverridden
+    protected void doSetup(@Named("incoming") RecordBatch incoming, 
@Named("outgoing") VectorAccessible outgoing) {};
+
+    @RuntimeOverridden
+    protected boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") 
int outIndex) { return false; };
+
+    public void flush() throws IOException {
+      final ExecProtos.FragmentHandle handle = context.getHandle();
+
+      if (recordCount != 0) {
+
+        for(VectorWrapper<?> w : vectorContainer){
+          w.getValueVector().getMutator().setValueCount(recordCount);
+        }
+
+        FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast,
+                handle.getQueryId(),
+                handle.getMajorFragmentId(),
+                handle.getMinorFragmentId(),
+                operator.getOppositeMajorFragmentId(),
+                oppositeMinorFragmentId,
+                getWritableBatch());
+
+        tunnel.sendRecordBatch(statusHandler, writableBatch);
+        this.sendCount.increment();
+      } else {
+        logger.debug("Flush requested on an empty outgoing record batch" + 
(isLast ? " (last batch)" : ""));
+        if (isLast) {
+          // send final (empty) batch
+          FragmentWritableBatch writableBatch = new 
FragmentWritableBatch(isLast,
+                  handle.getQueryId(),
+                  handle.getMajorFragmentId(),
+                  handle.getMinorFragmentId(),
+                  operator.getOppositeMajorFragmentId(),
+                  oppositeMinorFragmentId,
+                  getWritableBatch());
+          tunnel.sendRecordBatch(statusHandler, writableBatch);
+          this.sendCount.increment();
+          vectorContainer.clear();
+          return;
+        }
+      }
+
+      // reset values and reallocate the buffer for each value vector based on 
the incoming batch.
+      // NOTE: the value vector is directly referenced by generated code; 
therefore references
+      // must remain valid.
+      recordCount = 0;
+      vectorContainer.zeroVectors();
+      for (VectorWrapper<?> v : vectorContainer) {
+        VectorAllocator.getAllocator(v.getValueVector(), 
DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_RECORD_BATCH_SIZE);
+      }
+      if (!statusHandler.isOk()) {
+        throw new IOException(statusHandler.getException());
+      }
+    }
+
+    public void initializeBatch() {
+      isLast = false;
+      vectorContainer.clear();
+
+      SchemaBuilder bldr = 
BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE);
+      for (VectorWrapper<?> v : incoming) {
+
+        // add field to the output schema
+        bldr.addField(v.getField());
+
+        // allocate a new value vector
+        ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), 
allocator);
+        VectorAllocator.getAllocator(outgoingVector, 
DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_RECORD_BATCH_SIZE);
+        vectorContainer.add(outgoingVector);
+      }
+      outSchema = bldr.build();
+      doSetup(incoming, vectorContainer);
+    }
+
+    public void resetBatch() {
+      isLast = false;
+      recordCount = 0;
+      for (VectorWrapper<?> v : vectorContainer){
+        v.getValueVector().clear();
+      }
+    }
+
+    public void setIsLast() {
+      isLast = true;
+    }
+
+    @Override
+    public BatchSchema getSchema() {
+      Preconditions.checkNotNull(outSchema);
+      return outSchema;
+    }
+
+    @Override
+    public int getRecordCount() {
+      return recordCount;
+    }
+
+    @Override
+    public TypedFieldId getValueVectorId(SchemaPath path) {
+      return vectorContainer.getValueVectorId(path);
+    }
+
+    @Override
+    public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... 
fieldIds) {
+      return vectorContainer.getValueAccessorById(clazz, fieldIds);
+    }
+
+    @Override
+    public Iterator<VectorWrapper<?>> iterator() {
+      return vectorContainer.iterator();
+    }
+
+    public WritableBatch getWritableBatch() {
+      return WritableBatch.getBatchNoHVWrap(recordCount, this, false);
+    }
+
+
+
+
+    public void clear(){
+      vectorContainer.clear();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/58cf129b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java
new file mode 100644
index 0000000..e3f9eae
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.partitionsender;
+
+
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.SendingAccountor;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.work.ErrorHelper;
+
+public class StatusHandler extends BaseRpcOutcomeListener<Ack> {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(StatusHandler.class);
+  RpcException ex;
+  SendingAccountor sendCount;
+  FragmentContext context;
+  boolean ok = true;
+
+  public StatusHandler(SendingAccountor sendCount, FragmentContext context) {
+    this.sendCount = sendCount;
+    this.context = context;
+  }
+
+  @Override
+  public void success(Ack value, ByteBuf buffer) {
+    sendCount.decrement();
+    super.success(value, buffer);
+  }
+
+  @Override
+  public void failed(RpcException ex) {
+    sendCount.decrement();
+    logger.error("Failure while sending data to user.", ex);
+    ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while 
sending fragment to client.", ex, logger);
+    ok = false;
+    this.ex = ex;
+  }
+
+  public boolean isOk() {
+    return ok;
+  }
+
+  public RpcException getException() {
+    return ex;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/58cf129b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 41b71d0..3f2e060 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -31,10 +31,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.record.*;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.FixedWidthVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.exec.vector.*;
 import org.apache.drill.exec.vector.allocator.FixedVectorAllocator;
 import org.apache.drill.exec.vector.allocator.VariableEstimatedVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
@@ -210,7 +207,7 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
 
     try {
       final CodeGenerator<Copier> cg = 
CodeGenerator.get(Copier.TEMPLATE_DEFINITION2, context.getFunctionRegistry());
-      generateCopies(cg.getRoot(), incoming, false);
+      CopyUtil.generateCopies(cg.getRoot(), incoming, false);
       Copier copier = context.getImplementationClass(cg);
       copier.setupRemover(context, incoming, this, null);
 
@@ -235,7 +232,7 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
 
     try {
       final CodeGenerator<Copier> cg = 
CodeGenerator.get(Copier.TEMPLATE_DEFINITION4, context.getFunctionRegistry());
-      generateCopies(cg.getRoot(), batch, true);
+      CopyUtil.generateCopies(cg.getRoot(), batch, true);
       Copier copier = context.getImplementationClass(cg);
       copier.setupRemover(context, batch, outgoing, null);
 
@@ -245,43 +242,6 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
     }
   }
 
-  public static void generateCopies(ClassGenerator g, VectorAccessible batch, 
boolean hyper){
-    // we have parallel ids for each value vector so we don't actually have to 
deal with managing the ids at all.
-    int fieldId = 0;
-
-    JExpression inIndex = JExpr.direct("inIndex");
-    JExpression outIndex = JExpr.direct("outIndex");
-    g.rotateBlock();
-    for(VectorWrapper<?> vv : batch){
-      JVar inVV = g.declareVectorValueSetupAndMember("incoming", new 
TypedFieldId(vv.getField().getType(), vv.isHyper(), fieldId));
-      JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new 
TypedFieldId(vv.getField().getType(), false, fieldId));
-
-      if(hyper){
-
-        g.getEvalBlock()._if(
-            outVV
-            .invoke("copyFromSafe")
-            .arg(
-                inIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
-            .arg(outIndex)
-            .arg(
-                inVV.component(inIndex.shrz(JExpr.lit(16)))
-                )
-            .not()
-            )
-            ._then()._return(JExpr.FALSE);
-      }else{
-        
g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(inIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
-      }
-
-
-      fieldId++;
-    }
-    g.rotateBlock();
-    g.getEvalBlock()._return(JExpr.TRUE);
-  }
-
-
   @Override
   public WritableBatch getWritableBatch() {
     return WritableBatch.get(this);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/58cf129b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 2289680..d6cbbc4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -49,6 +49,7 @@ import 
org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
 import org.apache.hadoop.conf.Configuration;
@@ -508,7 +509,7 @@ public class ExternalSortBatch extends 
AbstractRecordBatch<ExternalSort> {
         generateComparisons(g, batch);
 
         g.setMappingSet(COPIER_MAPPING_SET);
-        RemovingRecordBatch.generateCopies(g, batch, true);
+        CopyUtil.generateCopies(g, batch, true);
         g.setMappingSet(MAIN_MAPPING);
         copier = context.getImplementationClass(cg);
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/58cf129b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java
new file mode 100644
index 0000000..1f09792
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JVar;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorWrapper;
+
+public class CopyUtil {
+  public static void generateCopies(ClassGenerator g, VectorAccessible batch, 
boolean hyper){
+    // we have parallel ids for each value vector so we don't actually have to 
deal with managing the ids at all.
+    int fieldId = 0;
+
+    JExpression inIndex = JExpr.direct("inIndex");
+    JExpression outIndex = JExpr.direct("outIndex");
+    g.rotateBlock();
+    for(VectorWrapper<?> vv : batch){
+      JVar inVV = g.declareVectorValueSetupAndMember("incoming", new 
TypedFieldId(vv.getField().getType(), vv.isHyper(), fieldId));
+      JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new 
TypedFieldId(vv.getField().getType(), false, fieldId));
+
+      if(hyper){
+
+        g.getEvalBlock()._if(
+                outVV
+                        .invoke("copyFromSafe")
+                        .arg(
+                                inIndex.band(JExpr.lit((int) 
Character.MAX_VALUE)))
+                        .arg(outIndex)
+                        .arg(
+                                inVV.component(inIndex.shrz(JExpr.lit(16)))
+                        )
+                        .not()
+        )
+                ._then()._return(JExpr.FALSE);
+      }else{
+        
g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(inIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
+      }
+
+
+      fieldId++;
+    }
+    g.rotateBlock();
+    g.getEvalBlock()._return(JExpr.TRUE);
+  }
+
+}

Reply via email to