DRILL-856: Reduce code-generation size for MergingReceiver
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b1d91c81 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b1d91c81 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b1d91c81 Branch: refs/heads/master Commit: b1d91c8187d197991306b4d054db135121ceca85 Parents: 58cf129 Author: Steven Phillips <[email protected]> Authored: Thu May 29 03:18:28 2014 -0700 Committer: Steven Phillips <[email protected]> Committed: Thu May 29 03:18:28 2014 -0700 ---------------------------------------------------------------------- .../MergingReceiverGeneratorBase.java | 13 +- .../mergereceiver/MergingReceiverTemplate.java | 33 +- .../impl/mergereceiver/MergingRecordBatch.java | 314 ++++--------------- 3 files changed, 75 insertions(+), 285 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1d91c81/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java index 46a156f..237f2f8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java @@ -23,25 +23,26 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.record.VectorAccessible; import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM; public interface MergingReceiverGeneratorBase { public abstract void doSetup(FragmentContext context, - RecordBatchLoader[] incomingBatchLoaders, - RecordBatch outgoing) throws SchemaChangeException; + VectorAccessible incoming, + VectorAccessible outgoing) throws SchemaChangeException; - public abstract int doCompare(MergingRecordBatch.Node left, - MergingRecordBatch.Node right); + public abstract int doEval(int leftIndex, + int rightIndex); - public abstract boolean doCopy(int inBatch, int inIndex, int outIndex); + public abstract boolean doCopy(int inIndex, int outIndex); public static TemplateClassDefinition<MergingReceiverGeneratorBase> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(MergingReceiverGeneratorBase.class, MergingReceiverTemplate.class); public final MappingSet compareMapping = - new MappingSet("left.valueIndex", "right.valueIndex", + new MappingSet("leftIndex", "rightIndex", GM("doSetup", "doCompare", null, null), GM("doSetup", "doCompare", null, null)); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1d91c81/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java index 197e960..002e054 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java @@ -21,6 +21,7 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.record.VectorAccessible; import javax.inject.Named; @@ -29,34 +30,12 @@ public abstract class MergingReceiverTemplate implements MergingReceiverGenerato public MergingReceiverTemplate() throws SchemaChangeException { } - /** - * Enter the generated setup routine - * @param context current fragment context - * @param incomingBatchLoaders one RecordBatchLoader per sender - * @param outgoing outgoing RecordBatch iterator - * @throws SchemaChangeException - */ public abstract void doSetup(@Named("context") FragmentContext context, - @Named("incomingBatchLoaders") RecordBatchLoader[] incomingBatchLoaders, - @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException; + @Named("incoming") VectorAccessible incoming, + @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException; - /** - * Enter the generated comparator - * @param leftNode reference to the left-hand value and vector - * @param rightNode reference to the right-hand value and vector - * @return - */ - public abstract int doCompare(@Named("leftNode") MergingRecordBatch.Node leftNode, - @Named("rightNode") MergingRecordBatch.Node rightNode); - - /** - * Enter the generated copy function - * @param inBatch incoming batch to copy from - * @param inIndex incoming record position to copy from - * @param outIndex outgoing record position to copy to - */ - public abstract boolean doCopy(@Named("inBatch") int inBatch, @Named("inIndex") int inIndex, @Named("outIndex") int outIndex); - -// public abstract void doEval(@Named("inBatch") int inBatch, @Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + public abstract int doEval(@Named("leftIndex") int leftIndex, + @Named("rightIndex") int rightIndex); + public abstract boolean doCopy(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b1d91c81/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 9101202..be5bf76 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -25,15 +25,19 @@ import java.util.Iterator; import java.util.List; import java.util.PriorityQueue; +import com.sun.codemodel.*; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.compile.sig.GeneratorMapping; +import org.apache.drill.exec.compile.sig.MappingSet; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.TypeHelper; @@ -43,20 +47,12 @@ import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.physical.config.MergingReceiverPOP; +import org.apache.drill.exec.physical.impl.xsort.PriorityQueueCopier; import org.apache.drill.exec.proto.UserBitShared; -import org.apache.drill.exec.record.AbstractRecordBatch; -import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.exec.record.RawFragmentBatch; -import org.apache.drill.exec.record.RawFragmentBatchProvider; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.RecordBatchLoader; -import org.apache.drill.exec.record.SchemaBuilder; -import org.apache.drill.exec.record.TypedFieldId; -import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.record.*; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.vector.CopyUtil; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; import org.eigenbase.rel.RelFieldCollation.Direction; @@ -65,13 +61,6 @@ import parquet.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.sun.codemodel.JArray; -import com.sun.codemodel.JClass; -import com.sun.codemodel.JExpr; -import com.sun.codemodel.JExpression; -import com.sun.codemodel.JMod; -import com.sun.codemodel.JType; -import com.sun.codemodel.JVar; /** @@ -100,7 +89,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> private int[] batchOffsets; private PriorityQueue <Node> pqueue; private List<VectorAllocator> allocators; - private MergingReceiverPOP config; public static enum Metric implements MetricDef{ NEXT_WAIT_NANOS; @@ -118,7 +106,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> super(config, context); this.fragProviders = fragProviders; this.context = context; - this.config = config; this.allocators = Lists.newArrayList(); this.outgoingContainer = new VectorContainer(); } @@ -252,7 +239,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> // allocate the priority queue with the generated comparator this.pqueue = new PriorityQueue<Node>(fragProviders.length, new Comparator<Node>() { public int compare(Node node1, Node node2) { - return merger.doCompare(node1, node2); + int leftIndex = (node1.batchId << 16) + node1.valueIndex; + int rightIndex = (node2.batchId << 16) + node2.valueIndex; + return merger.doEval(leftIndex, rightIndex); } }); @@ -444,248 +433,68 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> */ private MergingReceiverGeneratorBase createMerger() throws SchemaChangeException { - // set up the expression evaluator and code generation - final List<Ordering> orderings = config.getOrderings(); - final ErrorCollector collector = new ErrorCollectorImpl(); - final ClassGenerator<MergingReceiverGeneratorBase> cg = - CodeGenerator.getRoot(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getFunctionRegistry()); - JExpression inIndex = JExpr.direct("inIndex"); - JExpression outIndex = JExpr.direct("outIndex"); - - JType valueVector2DArray = cg.getModel().ref(ValueVector.class).array().array(); - JType valueVectorArray = cg.getModel().ref(ValueVector.class).array(); - JType incomingBatchesType = cg.getModel().ref(RecordBatchLoader.class).array(); - JType incomingBatchType = cg.getModel().ref(RecordBatchLoader.class); - JType recordBatchType = cg.getModel().ref(RecordBatch.class); - - // declare a two-dimensional array of value vectors; batch is first dimension, ValueVectorId is the second - JVar incomingVectors = cg.clazz.field(JMod.NONE, - valueVector2DArray, - "incomingVectors"); - - // declare a two-dimensional array of vectors used to store a reference to all ValueVectors - // used in a comparison operation. first dimension is the batch id. each batch has one or more - // comparison vectors, maintaining the order defined by the OrderDef. - JVar comparisonVectors = cg.clazz.field(JMod.NONE, - valueVector2DArray, - "comparisonVectors"); - - // declare an array of incoming batches - JVar incomingBatchesVar = cg.clazz.field(JMod.NONE, - incomingBatchesType, - "incomingBatches"); - - // declare an array of outgoing vectors - JVar outgoingVectors = cg.clazz.field(JMod.NONE, - valueVectorArray, - "outgoingVectors"); - - // declare a reference to this MergingRecordBatch - JVar outgoingBatch = cg.clazz.field(JMod.NONE, - recordBatchType, - "outgoingBatch"); - - // create aliases for materializer - JVar incomingVar = cg.clazz.field(JMod.NONE, incomingBatchType, "incoming"); - cg.getSetupBlock().assign(incomingBatchesVar, JExpr.direct("incomingBatchLoaders")); - cg.getSetupBlock().assign(outgoingBatch, JExpr.direct("outgoing")); - - cg.setMappingSet(MergingReceiverGeneratorBase.compareMapping); - - // evaluate expression on each incoming batch and create/initialize 2d array of incoming vectors. For example: - // incomingVectors = new ValueVector[][] { - // new ValueVector[] {vv1, vv2}, - // new ValueVector[] {vv3, vv4} - // }); - int fieldsPerBatch = 0; // number of fields per batch - int batchIdx = 0; - JArray incomingVectorInit = JExpr.newArray(cg.getModel().ref(ValueVector.class).array()); - List <List<ValueVectorReadExpression>> cmpExpressions = Lists.newArrayList(); - for (RecordBatchLoader batch : batchLoaders) { - JArray incomingVectorInitBatch = JExpr.newArray(cg.getModel().ref(ValueVector.class)); - int fieldIdx = 0; - for (VectorWrapper<?> vv : batch) { - // declare incoming value vector and assign it to the array - JVar inVV = cg.declareVectorValueSetupAndMember("incomingBatches[" + batchIdx + "]", - new TypedFieldId(vv.getField().getType(), - false, - fieldIdx)); - - // add vv to initialization list (e.g. { vv1, vv2, vv3 } ) - incomingVectorInitBatch.add(inVV); - ++fieldIdx; - } - - // add VV array to initialization list (e.g. new ValueVector[] { ... }) - incomingVectorInit.add(incomingVectorInitBatch); - - // materialize each expression for this incoming batch - for (int i = 0; i < orderings.size(); ++i) { - cmpExpressions.add(new ArrayList<ValueVectorReadExpression>()); - cg.getSetupBlock().assign(incomingVar, JExpr.direct("incomingBatches[" + batchIdx + "]")); - LogicalExpression exprForCurrentBatch = ExpressionTreeMaterializer.materialize(orderings.get(i).getExpr(), - batch, - collector, - context.getFunctionRegistry()); - if (collector.hasErrors()) { - throw new SchemaChangeException( - String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", - collector.toErrorString())); - } + try { + CodeGenerator<MergingReceiverGeneratorBase> cg = CodeGenerator.get(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getFunctionRegistry()); + ClassGenerator<MergingReceiverGeneratorBase> g = cg.getRoot(); - // add materialized field expression to comparison list - if (exprForCurrentBatch instanceof ValueVectorReadExpression) { - cmpExpressions.get(batchIdx).add((ValueVectorReadExpression) exprForCurrentBatch); - } - else { - throw new SchemaChangeException("Invalid expression supplied to MergingReceiver operator"); + ExpandableHyperContainer batch = null; + boolean first = true; + for (RecordBatchLoader loader : batchLoaders) { + if (first) { + batch = new ExpandableHyperContainer(loader); + first = false; + } else { + batch.addBatch(loader); } } - ++batchIdx; - fieldsPerBatch = fieldIdx; - } + generateComparisons(g, batch); - // write out the incoming vector initialization block - cg.getSetupBlock().assign(incomingVectors, incomingVectorInit); - - // Generate the comparison function: - // The generated code compares the fields defined in each logical expression. The batch index - // is supplied by the function caller (at runtime). The comparison statements for each - // expression are generated for each schema change. Inequality checks (< and >) for each batch - // are executed first to accommodate multiple expressions. Equality is checked only for the last - // expression, and only if all previous expressions are equal. Expression order is applied - // to the result of the FunctionCall. - - JArray comparisonVectorInit = JExpr.newArray(cg.getModel().ref(ValueVector.class).array()); - for (int b = 0; b < cmpExpressions.size(); ++b) { - JArray comparisonVectorInitBatch = JExpr.newArray(cg.getModel().ref(ValueVector.class)); - - for (ValueVectorReadExpression vvRead : cmpExpressions.get(b)) { - TypeProtos.DataMode mode = vvRead.getMajorType().getMode(); - TypeProtos.MinorType minor = vvRead.getMajorType().getMinorType(); - Class cmpVectorClass = TypeHelper.getValueVectorClass(minor, mode); - - JExpression arr = JExpr.newArray(cg.getModel().INT).add(JExpr.lit(vvRead.getFieldId().getFieldIds()[0])); - comparisonVectorInitBatch.add( - ((JExpression) incomingBatchesVar.component(JExpr.lit(b))) - .invoke("getValueAccessorById") - .arg(cg.getModel()._ref(cmpVectorClass).boxify().dotclass()) - .arg(arr) - .invoke("getValueVector")); + g.setMappingSet(COPIER_MAPPING_SET); + CopyUtil.generateCopies(g, batch, true); + g.setMappingSet(MAIN_MAPPING); + MergingReceiverGeneratorBase merger = context.getImplementationClass(cg); - } - comparisonVectorInit.add(comparisonVectorInitBatch); + merger.doSetup(context, batch, outgoingContainer); + return merger; + } catch (ClassTransformationException | IOException e) { + throw new SchemaChangeException(e); } + } - cg.getSetupBlock().assign(comparisonVectors, comparisonVectorInit); - - int comparisonVectorIndex = 0; - for (ValueVectorReadExpression vvRead : cmpExpressions.get(0)) { - // generate the comparison statements based on the first batch (assumes comparison fields are homogeneous) - TypeProtos.DataMode mode = vvRead.getMajorType().getMode(); - TypeProtos.MinorType minor = vvRead.getMajorType().getMinorType(); - JType vectorType = cg.getModel()._ref(TypeHelper.getValueVectorClass(minor, mode)); - JType valueType = TypeHelper.getHolderType(cg.getModel(), minor, mode); - - // set up a holding container expression for left-hand side of function call - JVar leftVar = cg.getEvalBlock().decl(valueType, "leftValue" + comparisonVectorIndex, JExpr._new(valueType)); - cg.getEvalBlock().add(((JExpression) JExpr.cast(vectorType, - ((JExpression) comparisonVectors - .component(JExpr.direct("leftNode.batchId"))) - .component(JExpr.lit(comparisonVectorIndex)))) - .invoke("getAccessor") - .invoke("get") - .arg(JExpr.direct("leftNode.valueIndex")) - .arg(leftVar)); - - ClassGenerator.HoldingContainer left = new ClassGenerator.HoldingContainer(vvRead.getMajorType(), - leftVar, - leftVar.ref("value"), - leftVar.ref("isSet")); - - // set up a holding container expression for right-hand side of function call - JVar rightVar = cg.getEvalBlock().decl(valueType, "rightValue" + comparisonVectorIndex, JExpr._new(valueType)); - cg.getEvalBlock().add(((JExpression) JExpr.cast(vectorType, - ((JExpression) comparisonVectors - .component(JExpr.direct("rightNode.batchId"))) - .component(JExpr.lit(comparisonVectorIndex)))) - .invoke("getAccessor") - .invoke("get") - .arg(JExpr.direct("rightNode.valueIndex")) - .arg(rightVar)); - - ClassGenerator.HoldingContainer right = new ClassGenerator.HoldingContainer(vvRead.getMajorType(), - rightVar, - rightVar.ref("value"), - rightVar.ref("isSet")); - - // generate the comparison function + public final MappingSet MAIN_MAPPING = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP); + public final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP); + public final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP); + GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null); + public final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, COPIER_MAPPING); + + private void generateComparisons(ClassGenerator g, VectorAccessible batch) throws SchemaChangeException { + g.setMappingSet(MAIN_MAPPING); + + for(Ordering od : popConfig.getOrderings()){ + // first, we rewrite the evaluation stack for each side of the comparison. + ErrorCollector collector = new ErrorCollectorImpl(); + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry()); + if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + g.setMappingSet(LEFT_MAPPING); + HoldingContainer left = g.addExpr(expr, false); + g.setMappingSet(RIGHT_MAPPING); + HoldingContainer right = g.addExpr(expr, false); + g.setMappingSet(MAIN_MAPPING); + + // next we wrap the two comparison sides and add the expression block for the comparison. LogicalExpression fh = FunctionGenerationHelper.getComparator(left, right, context.getFunctionRegistry()); - ClassGenerator.HoldingContainer out = cg.addExpr(fh, false); - - // generate less than/greater than checks (fixing results for ASCending vs. DESCending) - cg.getEvalBlock()._if(out.getValue().eq(JExpr.lit(1))) - ._then() - ._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection() == Direction.ASCENDING ? 1 : -1)); - - cg.getEvalBlock()._if(out.getValue().eq(JExpr.lit(-1))) - ._then() - ._return(JExpr.lit(config.getOrderings().get(comparisonVectorIndex).getDirection() == Direction.ASCENDING ? -1 : 1)); - - ++comparisonVectorIndex; - } + HoldingContainer out = g.addExpr(fh, false); + JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); - // if all expressions are equal, return 0 - cg.getEvalBlock()._return(JExpr.lit(0)); - - // allocate a new array for outgoing vectors - cg.getSetupBlock().assign(outgoingVectors, JExpr.newArray(cg.getModel().ref(ValueVector.class), fieldsPerBatch)); - - // generate copy function and setup outgoing batches - cg.setMappingSet(MergingReceiverGeneratorBase.copyMapping); - int fieldIdx = 0; - for (VectorWrapper<?> vvOut : outgoingContainer) { - // declare outgoing value vectors - JVar outgoingVV = cg.declareVectorValueSetupAndMember("outgoingBatch", - new TypedFieldId(vvOut.getField().getType(), - vvOut.isHyper(), fieldIdx)); - - // assign to the appropriate slot in the outgoingVector array (in order of iteration) - cg.getSetupBlock().assign(outgoingVectors.component(JExpr.lit(fieldIdx)), outgoingVV); - - // get the vector's type info - Class<?> vvType = TypeHelper.getValueVectorClass(vvOut.getField().getType().getMinorType(), - vvOut.getField().getType().getMode()); - JClass vvClass = cg.getModel().ref(vvType); - - // generate the copyFrom() invocation with explicit cast to the appropriate type; for example: - // ((IntVector) outgoingVectors[i]).copyFrom(inIndex, - // outgoingBatch.getRecordCount(), - // (IntVector) vv1); - cg.getEvalBlock()._if( - ((JExpression) JExpr.cast(vvClass, outgoingVectors.component(JExpr.lit(fieldIdx)))) - .invoke("copyFromSafe") - .arg(inIndex) - .arg(outIndex) - .arg(JExpr.cast(vvClass, - ((JExpression) incomingVectors.component(JExpr.direct("inBatch"))) - .component(JExpr.lit(fieldIdx)))).not())._then()._return(JExpr.FALSE); - - ++fieldIdx; + if(od.getDirection() == Direction.ASCENDING){ + jc._then()._return(out.getValue()); + }else{ + jc._then()._return(out.getValue().minus()); + } } - cg.rotateBlock(); - cg.getEvalBlock()._return(JExpr.TRUE); - // compile generated code and call the generated setup method - MergingReceiverGeneratorBase newMerger; - try { - newMerger = context.getImplementationClass(cg); - newMerger.doSetup(context, batchLoaders, this); - } catch (ClassTransformationException | IOException e) { - throw new SchemaChangeException("Failure while attempting to load generated class", e); - } - return newMerger; + g.getEvalBlock()._return(JExpr.lit(0)); } /** @@ -695,7 +504,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> * @param node Reference to the next record to copy from the incoming batches */ private boolean copyRecordToOutgoingBatch(Node node) { - if (!merger.doCopy(node.batchId, node.valueIndex, outgoingPosition)) { + int inIndex = (node.batchId << 16) + node.valueIndex; + if (!merger.doCopy(inIndex, outgoingPosition)) { return false; } else { outgoingPosition++;
