DRILL-856: Reduce code-generation size for MergingReceiver

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b1d91c81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b1d91c81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b1d91c81

Branch: refs/heads/master
Commit: b1d91c8187d197991306b4d054db135121ceca85
Parents: 58cf129
Author: Steven Phillips <[email protected]>
Authored: Thu May 29 03:18:28 2014 -0700
Committer: Steven Phillips <[email protected]>
Committed: Thu May 29 03:18:28 2014 -0700

----------------------------------------------------------------------
 .../MergingReceiverGeneratorBase.java           |  13 +-
 .../mergereceiver/MergingReceiverTemplate.java  |  33 +-
 .../impl/mergereceiver/MergingRecordBatch.java  | 314 ++++---------------
 3 files changed, 75 insertions(+), 285 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1d91c81/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java
index 46a156f..237f2f8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java
@@ -23,25 +23,26 @@ import 
org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorAccessible;
 
 import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
 
 public interface MergingReceiverGeneratorBase {
   
   public abstract void doSetup(FragmentContext context,
-                               RecordBatchLoader[] incomingBatchLoaders,
-                               RecordBatch outgoing) throws 
SchemaChangeException;
+                               VectorAccessible incoming,
+                               VectorAccessible outgoing) throws 
SchemaChangeException;
 
-  public abstract int doCompare(MergingRecordBatch.Node left,
-                                MergingRecordBatch.Node right);
+  public abstract int doEval(int leftIndex,
+                                int rightIndex);
 
-  public abstract boolean doCopy(int inBatch, int inIndex, int outIndex);
+  public abstract boolean doCopy(int inIndex, int outIndex);
 
   public static TemplateClassDefinition<MergingReceiverGeneratorBase> 
TEMPLATE_DEFINITION =
       new TemplateClassDefinition<>(MergingReceiverGeneratorBase.class, 
MergingReceiverTemplate.class);
 
   public final MappingSet compareMapping =
-    new MappingSet("left.valueIndex", "right.valueIndex",
+    new MappingSet("leftIndex", "rightIndex",
       GM("doSetup", "doCompare", null, null),
       GM("doSetup", "doCompare", null, null));
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1d91c81/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java
index 197e960..002e054 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java
@@ -21,6 +21,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorAccessible;
 
 import javax.inject.Named;
 
@@ -29,34 +30,12 @@ public abstract class MergingReceiverTemplate implements 
MergingReceiverGenerato
 
   public MergingReceiverTemplate() throws SchemaChangeException { }
 
-  /**
-   * Enter the generated setup routine
-   * @param context current fragment context
-   * @param incomingBatchLoaders one RecordBatchLoader per sender
-   * @param outgoing outgoing RecordBatch iterator
-   * @throws SchemaChangeException
-   */
   public abstract void doSetup(@Named("context") FragmentContext context,
-                               @Named("incomingBatchLoaders") 
RecordBatchLoader[] incomingBatchLoaders,
-                               @Named("outgoing") RecordBatch outgoing) throws 
SchemaChangeException;
+                               @Named("incoming") VectorAccessible incoming,
+                               @Named("outgoing") VectorAccessible outgoing) 
throws SchemaChangeException;
 
-  /**
-   * Enter the generated comparator
-   * @param leftNode  reference to the left-hand value and vector
-   * @param rightNode reference to the right-hand value and vector
-   * @return
-   */
-  public abstract int doCompare(@Named("leftNode") MergingRecordBatch.Node 
leftNode,
-                                @Named("rightNode") MergingRecordBatch.Node 
rightNode);
-
-  /**
-   * Enter the generated copy function
-   * @param inBatch incoming batch to copy from
-   * @param inIndex incoming record position to copy from
-   * @param outIndex outgoing record position to copy to
-   */
-  public abstract boolean doCopy(@Named("inBatch") int inBatch, 
@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
-
-//  public abstract void doEval(@Named("inBatch") int inBatch, 
@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+  public abstract int doEval(@Named("leftIndex") int leftIndex,
+                                @Named("rightIndex") int rightIndex);
 
+  public abstract boolean doCopy(@Named("inIndex") int inIndex, 
@Named("outIndex") int outIndex);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1d91c81/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 9101202..be5bf76 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -25,15 +25,19 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.PriorityQueue;
 
+import com.sun.codemodel.*;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.compile.sig.GeneratorMapping;
+import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
 import org.apache.drill.exec.expr.TypeHelper;
@@ -43,20 +47,12 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.MergingReceiverPOP;
+import org.apache.drill.exec.physical.impl.xsort.PriorityQueueCopier;
 import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.record.AbstractRecordBatch;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.RawFragmentBatch;
-import org.apache.drill.exec.record.RawFragmentBatchProvider;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.record.SchemaBuilder;
-import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.*;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
 import org.eigenbase.rel.RelFieldCollation.Direction;
@@ -65,13 +61,6 @@ import parquet.Preconditions;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
-import com.sun.codemodel.JArray;
-import com.sun.codemodel.JClass;
-import com.sun.codemodel.JExpr;
-import com.sun.codemodel.JExpression;
-import com.sun.codemodel.JMod;
-import com.sun.codemodel.JType;
-import com.sun.codemodel.JVar;
 
 
 /**
@@ -100,7 +89,6 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
   private int[] batchOffsets;
   private PriorityQueue <Node> pqueue;
   private List<VectorAllocator> allocators;
-  private MergingReceiverPOP config;
 
   public static enum Metric implements MetricDef{
     NEXT_WAIT_NANOS;
@@ -118,7 +106,6 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
     super(config, context);
     this.fragProviders = fragProviders;
     this.context = context;
-    this.config = config;
     this.allocators = Lists.newArrayList();
     this.outgoingContainer = new VectorContainer();
   }
@@ -252,7 +239,9 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
       // allocate the priority queue with the generated comparator
       this.pqueue = new PriorityQueue<Node>(fragProviders.length, new 
Comparator<Node>() {
         public int compare(Node node1, Node node2) {
-          return merger.doCompare(node1, node2);
+          int leftIndex = (node1.batchId << 16) + node1.valueIndex;
+          int rightIndex = (node2.batchId << 16) + node2.valueIndex;
+          return merger.doEval(leftIndex, rightIndex);
         }
       });
 
@@ -444,248 +433,68 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
    */
   private MergingReceiverGeneratorBase createMerger() throws 
SchemaChangeException {
 
-    // set up the expression evaluator and code generation
-    final List<Ordering> orderings = config.getOrderings();
-    final ErrorCollector collector = new ErrorCollectorImpl();
-    final ClassGenerator<MergingReceiverGeneratorBase> cg =
-        
CodeGenerator.getRoot(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, 
context.getFunctionRegistry());
-    JExpression inIndex = JExpr.direct("inIndex");
-    JExpression outIndex = JExpr.direct("outIndex");
-
-    JType valueVector2DArray = 
cg.getModel().ref(ValueVector.class).array().array();
-    JType valueVectorArray = cg.getModel().ref(ValueVector.class).array();
-    JType incomingBatchesType = 
cg.getModel().ref(RecordBatchLoader.class).array();
-    JType incomingBatchType = cg.getModel().ref(RecordBatchLoader.class);
-    JType recordBatchType = cg.getModel().ref(RecordBatch.class);
-
-    // declare a two-dimensional array of value vectors; batch is first 
dimension, ValueVectorId is the second
-    JVar incomingVectors = cg.clazz.field(JMod.NONE,
-      valueVector2DArray,
-      "incomingVectors");
-
-    // declare a two-dimensional array of vectors used to store a reference to 
all ValueVectors
-    // used in a comparison operation.  first dimension is the batch id.  each 
batch has one or more
-    // comparison vectors, maintaining the order defined by the OrderDef.
-    JVar comparisonVectors = cg.clazz.field(JMod.NONE,
-      valueVector2DArray,
-      "comparisonVectors");
-
-    // declare an array of incoming batches
-    JVar incomingBatchesVar = cg.clazz.field(JMod.NONE,
-      incomingBatchesType,
-      "incomingBatches");
-
-    // declare an array of outgoing vectors
-    JVar outgoingVectors = cg.clazz.field(JMod.NONE,
-      valueVectorArray,
-      "outgoingVectors");
-
-    // declare a reference to this MergingRecordBatch
-    JVar outgoingBatch = cg.clazz.field(JMod.NONE,
-      recordBatchType,
-      "outgoingBatch");
-
-    // create aliases for materializer
-    JVar incomingVar = cg.clazz.field(JMod.NONE, incomingBatchType, 
"incoming");
-    cg.getSetupBlock().assign(incomingBatchesVar, 
JExpr.direct("incomingBatchLoaders"));
-    cg.getSetupBlock().assign(outgoingBatch, JExpr.direct("outgoing"));
-
-    cg.setMappingSet(MergingReceiverGeneratorBase.compareMapping);
-
-    // evaluate expression on each incoming batch and create/initialize 2d 
array of incoming vectors.  For example:
-    //     incomingVectors = new ValueVector[][] {
-    //                         new ValueVector[] {vv1, vv2},
-    //                         new ValueVector[] {vv3, vv4}
-    //                       });
-    int fieldsPerBatch = 0; // number of fields per batch
-    int batchIdx = 0;
-    JArray incomingVectorInit = 
JExpr.newArray(cg.getModel().ref(ValueVector.class).array());
-    List <List<ValueVectorReadExpression>> cmpExpressions = 
Lists.newArrayList();
-    for (RecordBatchLoader batch : batchLoaders) {
-      JArray incomingVectorInitBatch = 
JExpr.newArray(cg.getModel().ref(ValueVector.class));
-      int fieldIdx = 0;
-      for (VectorWrapper<?> vv : batch) {
-        // declare incoming value vector and assign it to the array
-        JVar inVV = cg.declareVectorValueSetupAndMember("incomingBatches[" + 
batchIdx + "]",
-          new TypedFieldId(vv.getField().getType(),
-            false,
-            fieldIdx));
-
-        // add vv to initialization list (e.g. { vv1, vv2, vv3 } )
-        incomingVectorInitBatch.add(inVV);
-        ++fieldIdx;
-      }
-
-      // add VV array to initialization list (e.g. new ValueVector[] { ... })
-      incomingVectorInit.add(incomingVectorInitBatch);
-
-      // materialize each expression for this incoming batch
-      for (int i = 0; i < orderings.size(); ++i) {
-        cmpExpressions.add(new ArrayList<ValueVectorReadExpression>());
-        cg.getSetupBlock().assign(incomingVar, JExpr.direct("incomingBatches[" 
+ batchIdx + "]"));
-        LogicalExpression exprForCurrentBatch = 
ExpressionTreeMaterializer.materialize(orderings.get(i).getExpr(),
-                                                                               
        batch,
-                                                                               
        collector,
-                                                                               
        context.getFunctionRegistry());
-        if (collector.hasErrors()) {
-          throw new SchemaChangeException(
-              String.format("Failure while trying to materialize incoming 
schema.  Errors:\n %s.",
-                            collector.toErrorString()));
-        }
+    try {
+      CodeGenerator<MergingReceiverGeneratorBase> cg = 
CodeGenerator.get(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, 
context.getFunctionRegistry());
+      ClassGenerator<MergingReceiverGeneratorBase> g = cg.getRoot();
 
-        // add materialized field expression to comparison list
-        if (exprForCurrentBatch instanceof ValueVectorReadExpression) {
-          cmpExpressions.get(batchIdx).add((ValueVectorReadExpression) 
exprForCurrentBatch);
-        }
-        else {
-          throw new SchemaChangeException("Invalid expression supplied to 
MergingReceiver operator");
+      ExpandableHyperContainer batch = null;
+      boolean first = true;
+      for (RecordBatchLoader loader : batchLoaders) {
+        if (first) {
+          batch = new ExpandableHyperContainer(loader);
+          first = false;
+        } else {
+          batch.addBatch(loader);
         }
       }
 
-      ++batchIdx;
-      fieldsPerBatch = fieldIdx;
-    }
+      generateComparisons(g, batch);
 
-    // write out the incoming vector initialization block
-    cg.getSetupBlock().assign(incomingVectors, incomingVectorInit);
-
-    // Generate the comparison function:
-    // The generated code compares the fields defined in each logical 
expression.  The batch index
-    // is supplied by the function caller (at runtime).  The comparison 
statements for each
-    // expression are generated for each schema change.  Inequality checks (< 
and >) for each batch
-    // are executed first to accommodate multiple expressions.  Equality is 
checked only for the last
-    // expression, and only if all previous expressions are equal.  Expression 
order is applied
-    // to the result of the FunctionCall.
-
-    JArray comparisonVectorInit = 
JExpr.newArray(cg.getModel().ref(ValueVector.class).array());
-    for (int b = 0; b < cmpExpressions.size(); ++b) {
-      JArray comparisonVectorInitBatch = 
JExpr.newArray(cg.getModel().ref(ValueVector.class));
-
-      for (ValueVectorReadExpression vvRead : cmpExpressions.get(b)) {
-        TypeProtos.DataMode mode = vvRead.getMajorType().getMode();
-        TypeProtos.MinorType minor = vvRead.getMajorType().getMinorType();
-        Class cmpVectorClass = TypeHelper.getValueVectorClass(minor, mode);
-
-        JExpression arr = 
JExpr.newArray(cg.getModel().INT).add(JExpr.lit(vvRead.getFieldId().getFieldIds()[0]));
-        comparisonVectorInitBatch.add(
-            ((JExpression) incomingBatchesVar.component(JExpr.lit(b)))
-               .invoke("getValueAccessorById")
-                 .arg(cg.getModel()._ref(cmpVectorClass).boxify().dotclass())
-                 .arg(arr)
-                   .invoke("getValueVector"));
+      g.setMappingSet(COPIER_MAPPING_SET);
+      CopyUtil.generateCopies(g, batch, true);
+      g.setMappingSet(MAIN_MAPPING);
+      MergingReceiverGeneratorBase merger = context.getImplementationClass(cg);
 
-      }
-      comparisonVectorInit.add(comparisonVectorInitBatch);
+      merger.doSetup(context, batch, outgoingContainer);
+      return merger;
+    } catch (ClassTransformationException | IOException e) {
+      throw new SchemaChangeException(e);
     }
+  }
 
-    cg.getSetupBlock().assign(comparisonVectors, comparisonVectorInit);
-
-    int comparisonVectorIndex = 0;
-    for (ValueVectorReadExpression vvRead : cmpExpressions.get(0)) {
-      // generate the comparison statements based on the first batch (assumes 
comparison fields are homogeneous)
-      TypeProtos.DataMode mode = vvRead.getMajorType().getMode();
-      TypeProtos.MinorType minor = vvRead.getMajorType().getMinorType();
-      JType vectorType = 
cg.getModel()._ref(TypeHelper.getValueVectorClass(minor, mode));
-      JType valueType = TypeHelper.getHolderType(cg.getModel(), minor, mode);
-
-      // set up a holding container expression for left-hand side of function 
call
-      JVar leftVar = cg.getEvalBlock().decl(valueType, "leftValue" + 
comparisonVectorIndex, JExpr._new(valueType));
-      cg.getEvalBlock().add(((JExpression) JExpr.cast(vectorType,
-                                ((JExpression) comparisonVectors
-                                  .component(JExpr.direct("leftNode.batchId")))
-                                  
.component(JExpr.lit(comparisonVectorIndex))))
-        .invoke("getAccessor")
-        .invoke("get")
-        .arg(JExpr.direct("leftNode.valueIndex"))
-        .arg(leftVar));
-
-      ClassGenerator.HoldingContainer left = new 
ClassGenerator.HoldingContainer(vvRead.getMajorType(),
-                                                                               
leftVar,
-                                                                               
leftVar.ref("value"),
-                                                                               
leftVar.ref("isSet"));
-
-      // set up a holding container expression for right-hand side of function 
call
-      JVar rightVar = cg.getEvalBlock().decl(valueType, "rightValue" + 
comparisonVectorIndex, JExpr._new(valueType));
-      cg.getEvalBlock().add(((JExpression) JExpr.cast(vectorType,
-                                ((JExpression) comparisonVectors
-                                    
.component(JExpr.direct("rightNode.batchId")))
-                                    
.component(JExpr.lit(comparisonVectorIndex))))
-        .invoke("getAccessor")
-        .invoke("get")
-        .arg(JExpr.direct("rightNode.valueIndex"))
-        .arg(rightVar));
-
-      ClassGenerator.HoldingContainer right = new 
ClassGenerator.HoldingContainer(vvRead.getMajorType(),
-                                                                               
 rightVar,
-                                                                               
 rightVar.ref("value"),
-                                                                               
 rightVar.ref("isSet"));
-
-      // generate the comparison function
+  public final MappingSet MAIN_MAPPING = new MappingSet( (String) null, null, 
ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
+  public final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, 
ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
+  public final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, 
ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
+  GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", 
null, null);
+  public final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, 
COPIER_MAPPING);
+
+  private void generateComparisons(ClassGenerator g, VectorAccessible batch) 
throws SchemaChangeException {
+    g.setMappingSet(MAIN_MAPPING);
+
+    for(Ordering od : popConfig.getOrderings()){
+      // first, we rewrite the evaluation stack for each side of the 
comparison.
+      ErrorCollector collector = new ErrorCollectorImpl();
+      final LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(od.getExpr(), batch, 
collector,context.getFunctionRegistry());
+      if(collector.hasErrors()) throw new SchemaChangeException("Failure while 
materializing expression. " + collector.toErrorString());
+      g.setMappingSet(LEFT_MAPPING);
+      HoldingContainer left = g.addExpr(expr, false);
+      g.setMappingSet(RIGHT_MAPPING);
+      HoldingContainer right = g.addExpr(expr, false);
+      g.setMappingSet(MAIN_MAPPING);
+
+      // next we wrap the two comparison sides and add the expression block 
for the comparison.
       LogicalExpression fh = FunctionGenerationHelper.getComparator(left, 
right, context.getFunctionRegistry());
-      ClassGenerator.HoldingContainer out = cg.addExpr(fh, false);
-
-      // generate less than/greater than checks (fixing results for ASCending 
vs. DESCending)
-      cg.getEvalBlock()._if(out.getValue().eq(JExpr.lit(1)))
-                       ._then()
-                       
._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection()
 == Direction.ASCENDING ? 1 : -1));
-
-      cg.getEvalBlock()._if(out.getValue().eq(JExpr.lit(-1)))
-                       ._then()
-                       
._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection()
 == Direction.ASCENDING ? -1 : 1));
-
-      ++comparisonVectorIndex;
-    }
+      HoldingContainer out = g.addExpr(fh, false);
+      JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
 
-    // if all expressions are equal, return 0
-    cg.getEvalBlock()._return(JExpr.lit(0));
-
-    // allocate a new array for outgoing vectors
-    cg.getSetupBlock().assign(outgoingVectors, 
JExpr.newArray(cg.getModel().ref(ValueVector.class), fieldsPerBatch));
-
-    // generate copy function and setup outgoing batches
-    cg.setMappingSet(MergingReceiverGeneratorBase.copyMapping);
-    int fieldIdx = 0;
-    for (VectorWrapper<?> vvOut : outgoingContainer) {
-      // declare outgoing value vectors
-      JVar outgoingVV = cg.declareVectorValueSetupAndMember("outgoingBatch",
-                                                            new 
TypedFieldId(vvOut.getField().getType(),
-                                                                             
vvOut.isHyper(), fieldIdx));
-
-      // assign to the appropriate slot in the outgoingVector array (in order 
of iteration)
-      
cg.getSetupBlock().assign(outgoingVectors.component(JExpr.lit(fieldIdx)), 
outgoingVV);
-
-      // get the vector's type info
-      Class<?> vvType = 
TypeHelper.getValueVectorClass(vvOut.getField().getType().getMinorType(),
-                                                       
vvOut.getField().getType().getMode());
-      JClass vvClass = cg.getModel().ref(vvType);
-
-      // generate the copyFrom() invocation with explicit cast to the 
appropriate type; for example:
-      // ((IntVector) outgoingVectors[i]).copyFrom(inIndex,
-      //                                           
outgoingBatch.getRecordCount(),
-      //                                           (IntVector) vv1);
-      cg.getEvalBlock()._if(
-        ((JExpression) JExpr.cast(vvClass, 
outgoingVectors.component(JExpr.lit(fieldIdx))))
-          .invoke("copyFromSafe")
-          .arg(inIndex)
-          .arg(outIndex)
-          .arg(JExpr.cast(vvClass,
-                          ((JExpression) 
incomingVectors.component(JExpr.direct("inBatch")))
-                            
.component(JExpr.lit(fieldIdx)))).not())._then()._return(JExpr.FALSE);
-
-      ++fieldIdx;
+      if(od.getDirection() == Direction.ASCENDING){
+        jc._then()._return(out.getValue());
+      }else{
+        jc._then()._return(out.getValue().minus());
+      }
     }
-    cg.rotateBlock();
-    cg.getEvalBlock()._return(JExpr.TRUE);
 
-    // compile generated code and call the generated setup method
-    MergingReceiverGeneratorBase newMerger;
-    try {
-      newMerger = context.getImplementationClass(cg);
-      newMerger.doSetup(context, batchLoaders, this);
-    } catch (ClassTransformationException | IOException e) {
-      throw new SchemaChangeException("Failure while attempting to load 
generated class", e);
-    }
-    return newMerger;
+    g.getEvalBlock()._return(JExpr.lit(0));
   }
 
   /**
@@ -695,7 +504,8 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
    * @param node Reference to the next record to copy from the incoming batches
    */
   private boolean copyRecordToOutgoingBatch(Node node) {
-    if (!merger.doCopy(node.batchId, node.valueIndex, outgoingPosition)) {
+    int inIndex = (node.batchId << 16) + node.valueIndex;
+    if (!merger.doCopy(inIndex, outgoingPosition)) {
       return false;
     } else {
       outgoingPosition++;

Reply via email to