[SYSTEMML-2279] Performance spark ctable (1-pass, fused reblock)

This patch significantly improves the performance of the spark ctable
instruction. So far, we constructed double outputs, converted them to
cell, aggregated the cells, determined dimensions, and finally used a
reblock to bring the output into binary block format. For large and
ultra-sparse matrices this caused a redundant pass over the expensive
cell conversion and lots of unnecessary shuffling.

Instead, we now scan the inputs to determine the output dimensions if
necessary, locally aggregate all cells of a partition and directly
output non-zero blocks which feed together with injected empty blocks
(via union) into a fused reblock with global aggregation. In addition,
this also includes numerous smaller improvements to utilize the existing
cluster parallelism and improve memory efficiency and thus reduce the
garbage collection overhead.

On a scenario of sum(table(seq(1,1e9),1+seq(1,1e9)/1000)) with disabled
rewrites and together with the changes from SYSTEMML-2279 through 2282,
this patch improved end-to-end the runtime from 3163s to 455s.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/b3fef523
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/b3fef523
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/b3fef523

Branch: refs/heads/master
Commit: b3fef523c1057c0c82935954210686317492607c
Parents: 7cb43dd
Author: Matthias Boehm <[email protected]>
Authored: Thu Apr 26 21:27:11 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Fri Apr 27 00:03:20 2018 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/hops/OptimizerUtils.java   |   4 +-
 .../java/org/apache/sysml/hops/TernaryOp.java   |  11 +-
 src/main/java/org/apache/sysml/lops/Ctable.java |  11 +-
 .../runtime/compress/CompressedMatrixBlock.java |   2 +-
 .../instructions/spark/CtableSPInstruction.java | 473 ++++++-------------
 .../spark/utils/RDDConverterUtils.java          |  11 +-
 .../spark/utils/RDDConverterUtilsExt.java       |   6 +-
 .../runtime/matrix/data/MatrixPackedCell.java   |   2 +-
 .../sysml/runtime/matrix/data/MatrixValue.java  |   4 -
 .../runtime/matrix/mapred/ReblockBuffer.java    |  15 +-
 .../sysml/runtime/util/UtilFunctions.java       |   8 +
 11 files changed, 177 insertions(+), 370 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index e9af001..72f9b81 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -686,7 +686,7 @@ public class OptimizerUtils
                                mc.getCols(), 
                                mc.getRowsPerBlock(), 
                                mc.getColsPerBlock(), 
-                               mc.getNonZeros());
+                               mc.getNonZerosBound());
        }
        
        /**
@@ -725,7 +725,7 @@ public class OptimizerUtils
                long tnrblks = (long)Math.ceil((double)rlen/brlen);
                long tncblks = (long)Math.ceil((double)clen/bclen);
                long nnz = (long) Math.ceil(sp * rlen * clen);
-               if( nnz < tnrblks * tncblks ) {
+               if( nnz <= tnrblks * tncblks ) {
                        long lrlen = Math.min(rlen, brlen);
                        long lclen = Math.min(clen, bclen);
                        return nnz * estimateSizeExactSparsity(lrlen, lclen, 1)

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/hops/TernaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/TernaryOp.java 
b/src/main/java/org/apache/sysml/hops/TernaryOp.java
index c7c7832..b6e62ff 100644
--- a/src/main/java/org/apache/sysml/hops/TernaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/TernaryOp.java
@@ -428,18 +428,11 @@ public class TernaryOp extends Hop
                        ternary.getOutputParameters().setDimensions(_dim1, 
_dim2, getRowsInBlock(), getColsInBlock(), -1);
                        setLineNumbers(ternary);
                        
-                       //force blocked output in CP (see below), otherwise 
binarycell
-                       if ( et == ExecType.SPARK ) {
-                               
ternary.getOutputParameters().setDimensions(_dim1, _dim2, -1, -1, -1);
-                               setRequiresReblock( true );
-                       }
-                       else
-                               
ternary.getOutputParameters().setDimensions(_dim1, _dim2, getRowsInBlock(), 
getColsInBlock(), -1);
+                       //force blocked output in CP and SPARK
+                       ternary.getOutputParameters().setDimensions(_dim1, 
_dim2, getRowsInBlock(), getColsInBlock(), -1);
                        
                        //ternary opt, w/o reblock in CP
                        setLops(ternary);
-                       
-                       
                }
                else //MR
                {

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/lops/Ctable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/Ctable.java 
b/src/main/java/org/apache/sysml/lops/Ctable.java
index ff9c720..127754e 100644
--- a/src/main/java/org/apache/sysml/lops/Ctable.java
+++ b/src/main/java/org/apache/sysml/lops/Ctable.java
@@ -42,7 +42,16 @@ public class Ctable extends Lop
                CTABLE_TRANSFORM_HISTOGRAM, 
                CTABLE_TRANSFORM_WEIGHTED_HISTOGRAM, 
                CTABLE_EXPAND_SCALAR_WEIGHT, 
-               INVALID 
+               INVALID;
+               public boolean hasSecondInput() {
+                       return this == CTABLE_TRANSFORM
+                               || this == CTABLE_EXPAND_SCALAR_WEIGHT
+                               || this == CTABLE_TRANSFORM_SCALAR_WEIGHT;
+               }
+               public boolean hasThirdInput() {
+                       return this == CTABLE_TRANSFORM
+                               || this == CTABLE_TRANSFORM_WEIGHTED_HISTOGRAM;
+               }
        }
        
        OperationTypes operation;

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index d1df033..53298c8 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -1966,7 +1966,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
        }
 
        @Override
-       public void incrementalAggregate(AggregateOperator aggOp, MatrixValue 
correction, MatrixValue newWithCorrection) {
+       public void incrementalAggregate(AggregateOperator aggOp, MatrixValue 
correction, MatrixValue newWithCorrection, boolean deep) {
                throw new DMLRuntimeException("CompressedMatrixBlock: 
incrementalAggregate not supported.");
        }
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/instructions/spark/CtableSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CtableSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CtableSPInstruction.java
index bf2cc91..65e619c 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CtableSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CtableSPInstruction.java
@@ -19,13 +19,14 @@
 
 package org.apache.sysml.runtime.instructions.spark;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.api.java.function.PairFunction;
 
 import scala.Tuple2;
 
@@ -34,7 +35,6 @@ import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.functionobjects.CTable;
 import org.apache.sysml.runtime.instructions.Instruction;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
@@ -43,15 +43,10 @@ import 
org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.CTableMap;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixCell;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
-import org.apache.sysml.runtime.matrix.data.Pair;
-import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
-import org.apache.sysml.runtime.matrix.operators.Operator;
-import org.apache.sysml.runtime.matrix.operators.SimpleOperator;
+import org.apache.sysml.runtime.matrix.mapred.ReblockBuffer;
 import org.apache.sysml.runtime.util.LongLongDoubleHashMap.ADoubleEntry;
-import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class CtableSPInstruction extends ComputationSPInstruction {
        private String _outDim1;
@@ -106,401 +101,209 @@ public class CtableSPInstruction extends 
ComputationSPInstruction {
        public void processInstruction(ExecutionContext ec) {
                SparkExecutionContext sec = (SparkExecutionContext)ec;
        
+               Ctable.OperationTypes ctableOp = 
Ctable.findCtableOperationByInputDataTypes(
+                       input1.getDataType(), input2.getDataType(), 
input3.getDataType());
+               ctableOp = _isExpand ? 
Ctable.OperationTypes.CTABLE_EXPAND_SCALAR_WEIGHT : ctableOp;
+               
                //get input rdd handle
                JavaPairRDD<MatrixIndexes,MatrixBlock> in1 = 
sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
-               JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = null;
-               JavaPairRDD<MatrixIndexes,MatrixBlock> in3 = null;
-               double scalar_input2 = -1, scalar_input3 = -1;
+               JavaPairRDD<MatrixIndexes,MatrixBlock> in2 = 
!ctableOp.hasSecondInput() ? null :
+                       sec.getBinaryBlockRDDHandleForVariable( 
input2.getName() );
                
-               Ctable.OperationTypes ctableOp = 
Ctable.findCtableOperationByInputDataTypes(
-                               input1.getDataType(), input2.getDataType(), 
input3.getDataType());
-               ctableOp = _isExpand ? 
Ctable.OperationTypes.CTABLE_EXPAND_SCALAR_WEIGHT : ctableOp;
+               JavaPairRDD<MatrixIndexes,MatrixBlock> in3 = null;
+               double s2 = -1, s3 = -1; //scalars
                
                MatrixCharacteristics mc1 = 
sec.getMatrixCharacteristics(input1.getName());
                MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
                
-               // First get the block sizes and then set them as -1 to allow 
for binary cell reblock
-               int brlen = mc1.getRowsPerBlock();
-               int bclen = mc1.getColsPerBlock();
+               // handle known/unknown dimensions
+               long dim1 = (_dim1Literal ? (long) Double.parseDouble(_outDim1) 
:
+                       (sec.getScalarInput(_outDim1, ValueType.DOUBLE, 
false)).getLongValue());
+               long dim2 = (_dim2Literal ? (long) Double.parseDouble(_outDim2) 
:
+                       (sec.getScalarInput(_outDim2, ValueType.DOUBLE, 
false)).getLongValue());
+               if( dim1 == -1 && dim2 == -1 ) {
+                       //note: if we need to determine the dimensions to we do 
so before 
+                       //creating cells to avoid unnecessary caching, repeated 
joins, etc.
+                       dim1 = (long) RDDAggregateUtils.max(in1);
+                       dim2 = ctableOp.hasSecondInput() ? (long) 
RDDAggregateUtils.max(in2) :
+                               sec.getScalarInput(input3).getLongValue();
+               }
+               mcOut.set(dim1, dim2, mc1.getRowsPerBlock(), 
mc1.getColsPerBlock());
+               mcOut.setNonZerosBound(mc1.getRows());
                
-               JavaPairRDD<MatrixIndexes, ArrayList<MatrixBlock>> inputMBs = 
null;
-               JavaPairRDD<MatrixIndexes, CTableMap> ctables = null;
-               JavaPairRDD<MatrixIndexes, Double> bincellsNoFilter = null;
-               boolean setLineage2 = false;
-               boolean setLineage3 = false;
+               //compute preferred degree of parallelism
+               int numParts = Math.max(4 * (mc1.dimsKnown() ?
+                       SparkUtils.getNumPreferredPartitions(mc1) : 
in1.getNumPartitions()),
+                       SparkUtils.getNumPreferredPartitions(mcOut));
+               
+               JavaPairRDD<MatrixIndexes, MatrixBlock> out = null;
                switch(ctableOp) {
                        case CTABLE_TRANSFORM: //(VECTOR)
                                // F=ctable(A,B,W) 
-                               in2 = sec.getBinaryBlockRDDHandleForVariable( 
input2.getName() );
                                in3 = sec.getBinaryBlockRDDHandleForVariable( 
input3.getName() );
-                               setLineage2 = true;
-                               setLineage3 = true;
-                               
-                               inputMBs = in1.cogroup(in2).cogroup(in3)
-                                                       .mapToPair(new 
MapThreeMBIterableIntoAL());
-                               
-                               ctables = inputMBs.mapToPair(new 
PerformCTableMapSideOperation(ctableOp, scalar_input2, 
-                                                       scalar_input3, 
this.instString, (SimpleOperator)_optr, _ignoreZeros));
+                               out = in1.join(in2, numParts).join(in3, 
numParts)
+                                       .mapValues(new MapJoinSignature3())
+                                       .mapPartitionsToPair(new 
CTableFunction(ctableOp, s2, s3, _ignoreZeros, mcOut));
                                break;
                        
-                               
                        case CTABLE_EXPAND_SCALAR_WEIGHT: //(VECTOR)
-                               // F = ctable(seq,A) or F = ctable(seq,B,1)
-                               scalar_input3 = 
sec.getScalarInput(input3.getName(), input3.getValueType(), 
input3.isLiteral()).getDoubleValue();
-                               if(scalar_input3 == 1) {
-                                       in2 = 
sec.getBinaryBlockRDDHandleForVariable( input2.getName() );
-                                       setLineage2 = true;
-                                       bincellsNoFilter = 
in2.flatMapToPair(new ExpandScalarCtableOperation(brlen));
-                                       break;
-                               }
                        case CTABLE_TRANSFORM_SCALAR_WEIGHT: //(VECTOR/MATRIX)
                                // F = ctable(A,B) or F = ctable(A,B,1)
-                               in2 = sec.getBinaryBlockRDDHandleForVariable( 
input2.getName() );
-                               setLineage2 = true;
-
-                               scalar_input3 = 
sec.getScalarInput(input3.getName(), input3.getValueType(), 
input3.isLiteral()).getDoubleValue();
-                               inputMBs = in1.cogroup(in2).mapToPair(new 
MapTwoMBIterableIntoAL());
-                               
-                               ctables = inputMBs.mapToPair(new 
PerformCTableMapSideOperation(ctableOp, scalar_input2, 
-                                               scalar_input3, this.instString, 
(SimpleOperator)_optr, _ignoreZeros));
+                               s3 = 
sec.getScalarInput(input3).getDoubleValue();
+                               out = in1.join(in2, numParts).mapValues(new 
MapJoinSignature2())
+                                       .mapPartitionsToPair(new 
CTableFunction(ctableOp, s2, s3, _ignoreZeros, mcOut));
                                break;
                                
                        case CTABLE_TRANSFORM_HISTOGRAM: //(VECTOR)
                                // F=ctable(A,1) or F = ctable(A,1,1)
-                               scalar_input2 = 
sec.getScalarInput(input2.getName(), input2.getValueType(), 
input2.isLiteral()).getDoubleValue();
-                               scalar_input3 = 
sec.getScalarInput(input3.getName(), input3.getValueType(), 
input3.isLiteral()).getDoubleValue();
-                               inputMBs = in1.mapToPair(new MapMBIntoAL());
-                               
-                               ctables = inputMBs.mapToPair(new 
PerformCTableMapSideOperation(ctableOp, scalar_input2, 
-                                               scalar_input3, this.instString, 
(SimpleOperator)_optr, _ignoreZeros));
+                               s2 = 
sec.getScalarInput(input2).getDoubleValue();
+                               s3 = 
sec.getScalarInput(input3).getDoubleValue();
+                               out = in1.mapValues(new MapJoinSignature1())
+                                       .mapPartitionsToPair(new 
CTableFunction(ctableOp, s2, s3, _ignoreZeros, mcOut));
                                break;
                                
                        case CTABLE_TRANSFORM_WEIGHTED_HISTOGRAM: //(VECTOR)
                                // F=ctable(A,1,W)
                                in3 = sec.getBinaryBlockRDDHandleForVariable( 
input3.getName() );
-                               setLineage3 = true;
-                               
-                               scalar_input2 = 
sec.getScalarInput(input2.getName(), input2.getValueType(), 
input2.isLiteral()).getDoubleValue();
-                               inputMBs = in1.cogroup(in3).mapToPair(new 
MapTwoMBIterableIntoAL());
-                               
-                               ctables = inputMBs.mapToPair(new 
PerformCTableMapSideOperation(ctableOp, scalar_input2, 
-                                               scalar_input3, this.instString, 
(SimpleOperator)_optr, _ignoreZeros));
+                               s2 = 
sec.getScalarInput(input2).getDoubleValue();
+                               out = in1.join(in3, numParts).mapValues(new 
MapJoinSignature2())
+                                       .mapPartitionsToPair(new 
CTableFunction(ctableOp, s2, s3, _ignoreZeros, mcOut));
                                break;
                        
                        default:
                                throw new DMLRuntimeException("Encountered an 
invalid ctable operation ("+ctableOp+") while executing instruction: " + 
this.toString());
                }
                
-               // Now perform aggregation on ctables to get binaryCells 
-               if(bincellsNoFilter == null && ctables != null) {
-                       bincellsNoFilter =  
-                                       ctables.values()
-                                       .flatMapToPair(new 
ExtractBinaryCellsFromCTable());
-                       bincellsNoFilter = 
RDDAggregateUtils.sumCellsByKeyStable(bincellsNoFilter);
-               }
-               else if(!(bincellsNoFilter != null && ctables == null)) {
-                       throw new DMLRuntimeException("Incorrect ctable 
operation");
-               }
-               
-               // handle known/unknown dimensions
-               long outputDim1 = (_dim1Literal ? (long) 
Double.parseDouble(_outDim1) : (sec.getScalarInput(_outDim1, ValueType.DOUBLE, 
false)).getLongValue());
-               long outputDim2 = (_dim2Literal ? (long) 
Double.parseDouble(_outDim2) : (sec.getScalarInput(_outDim2, ValueType.DOUBLE, 
false)).getLongValue());
-               MatrixCharacteristics mcBinaryCells = null;
-               boolean findDimensions = (outputDim1 == -1 && outputDim2 == 
-1); 
-               
-               if( !findDimensions ) {
-                       if((outputDim1 == -1 && outputDim2 != -1) || 
(outputDim1 != -1 && outputDim2 == -1))
-                               throw new DMLRuntimeException("Incorrect output 
dimensions passed to TernarySPInstruction:" + outputDim1 + " " + outputDim2);
-                       else 
-                               mcBinaryCells = new 
MatrixCharacteristics(outputDim1, outputDim2, brlen, bclen);        
-                       
-                       // filtering according to given dimensions
-                       bincellsNoFilter = bincellsNoFilter
-                                       .filter(new 
FilterCells(mcBinaryCells.getRows(), mcBinaryCells.getCols()));
-               }
-               
-               // convert double values to matrix cell
-               JavaPairRDD<MatrixIndexes, MatrixCell> binaryCells = 
bincellsNoFilter
-                               .mapToPair(new ConvertToBinaryCell());
-               
-               // find dimensions if necessary (w/ cache for reblock)
-               if( findDimensions ) {                                          
-                       binaryCells = 
SparkUtils.cacheBinaryCellRDD(binaryCells);
-                       mcBinaryCells = 
SparkUtils.computeMatrixCharacteristics(binaryCells);
-               }
+               //perform fused aggregation and reblock
+               out = 
out.union(SparkUtils.getEmptyBlockRDD(sec.getSparkContext(), mcOut));
+               out = RDDAggregateUtils.sumByKeyStable(out, numParts, false);
                
                //store output rdd handle
-               sec.setRDDHandleForVariable(output.getName(), binaryCells);
-               mcOut.set(mcBinaryCells);
-               // Since we are outputing binary cells, we set block sizes = -1
-               mcOut.setRowsPerBlock(-1); mcOut.setColsPerBlock(-1);
+               sec.setRDDHandleForVariable(output.getName(), out);
                sec.addLineageRDD(output.getName(), input1.getName());
-               if(setLineage2)
+               if( ctableOp.hasSecondInput() )
                        sec.addLineageRDD(output.getName(), input2.getName());
-               if(setLineage3)
+               if( ctableOp.hasThirdInput() )
                        sec.addLineageRDD(output.getName(), input3.getName());
-       }       
-
-       private static class ExpandScalarCtableOperation implements 
PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>, MatrixIndexes, Double> 
-       {
-               private static final long serialVersionUID = 
-12552669148928288L;
-       
-               private int _brlen;
-               
-               public ExpandScalarCtableOperation(int brlen) {
-                       _brlen = brlen;
-               }
-
-               @Override
-               public Iterator<Tuple2<MatrixIndexes, Double>> 
call(Tuple2<MatrixIndexes, MatrixBlock> arg0) 
-                       throws Exception 
-               {
-                       MatrixIndexes ix = arg0._1();
-                       MatrixBlock mb = arg0._2(); //col-vector
-                       
-                       //create an output cell per matrix block row (aligned 
w/ original source position)
-                       ArrayList<Tuple2<MatrixIndexes, Double>> retVal = new 
ArrayList<>();
-                       CTable ctab = CTable.getCTableFnObject();
-                       for( int i=0; i<mb.getNumRows(); i++ )
-                       {
-                               //compute global target indexes (via ctable obj 
for error handling consistency)
-                               long row = 
UtilFunctions.computeCellIndex(ix.getRowIndex(), _brlen, i);
-                               double v2 = mb.quickGetValue(i, 0);
-                               Pair<MatrixIndexes,Double> p = 
ctab.execute(row, v2, 1.0);
-                               
-                               //indirect construction over pair to avoid 
tuple2 dependency in general ctable obj
-                               if( p.getKey().getRowIndex() >= 1 ) //filter 
rejected entries
-                                       retVal.add(new Tuple2<>(p.getKey(), 
p.getValue()));
-                       }
-                       
-                       return retVal.iterator();
-               }
-       }
-       
-       private static class MapTwoMBIterableIntoAL implements 
PairFunction<Tuple2<MatrixIndexes,Tuple2<Iterable<MatrixBlock>,Iterable<MatrixBlock>>>,
 MatrixIndexes, ArrayList<MatrixBlock>> {
-
-               private static final long serialVersionUID = 
271459913267735850L;
-
-               private static MatrixBlock extractBlock(Iterable<MatrixBlock> 
blks, MatrixBlock retVal) throws Exception {
-                       for(MatrixBlock blk1 : blks) {
-                               if(retVal != null) {
-                                       throw new Exception("ERROR: More than 1 
matrixblock found for one of the inputs at a given index");
-                               }
-                               retVal = blk1;
-                       }
-                       if(retVal == null) {
-                               throw new Exception("ERROR: No matrixblock 
found for one of the inputs at a given index");
-                       }
-                       return retVal;
-               }
-               
-               @Override
-               public Tuple2<MatrixIndexes, ArrayList<MatrixBlock>> call(
-                               Tuple2<MatrixIndexes, 
Tuple2<Iterable<MatrixBlock>, Iterable<MatrixBlock>>> kv)
-                               throws Exception {
-                       MatrixBlock in1 = null; MatrixBlock in2 = null;
-                       in1 = extractBlock(kv._2._1, in1);
-                       in2 = extractBlock(kv._2._2, in2);
-                       // Now return unflatten AL
-                       ArrayList<MatrixBlock> inputs = new ArrayList<>();
-                       inputs.add(in1); inputs.add(in2);  
-                       return new Tuple2<>(kv._1, inputs);
-               }
-               
        }
-       
-       private static class MapThreeMBIterableIntoAL implements 
PairFunction<Tuple2<MatrixIndexes,Tuple2<Iterable<Tuple2<Iterable<MatrixBlock>,Iterable<MatrixBlock>>>,Iterable<MatrixBlock>>>,
 MatrixIndexes, ArrayList<MatrixBlock>> {
-
-               private static final long serialVersionUID = 
-4873754507037646974L;
-               
-               private static MatrixBlock extractBlock(Iterable<MatrixBlock> 
blks, MatrixBlock retVal) throws Exception {
-                       for(MatrixBlock blk1 : blks) {
-                               if(retVal != null) {
-                                       throw new Exception("ERROR: More than 1 
matrixblock found for one of the inputs at a given index");
-                               }
-                               retVal = blk1;
-                       }
-                       if(retVal == null) {
-                               throw new Exception("ERROR: No matrixblock 
found for one of the inputs at a given index");
-                       }
-                       return retVal;
-               }
-
-               @Override
-               public Tuple2<MatrixIndexes, ArrayList<MatrixBlock>> call(
-                               Tuple2<MatrixIndexes, 
Tuple2<Iterable<Tuple2<Iterable<MatrixBlock>, Iterable<MatrixBlock>>>, 
Iterable<MatrixBlock>>> kv)
-                               throws Exception {
-                       MatrixBlock in1 = null; MatrixBlock in2 = null; 
MatrixBlock in3 = null;
-                       
-                       for(Tuple2<Iterable<MatrixBlock>, 
Iterable<MatrixBlock>> blks : kv._2._1) {
-                               in1 = extractBlock(blks._1, in1);
-                               in2 = extractBlock(blks._2, in2);
-                       }
-                       in3 = extractBlock(kv._2._2, in3);
-                       
-                       // Now return unflatten AL
-                       ArrayList<MatrixBlock> inputs = new ArrayList<>();
-                       inputs.add(in1); inputs.add(in2); inputs.add(in3);  
-                       return new Tuple2<>(kv._1, inputs);
-               }
-               
-       }
-       
-       private static class PerformCTableMapSideOperation implements 
PairFunction<Tuple2<MatrixIndexes,ArrayList<MatrixBlock>>, MatrixIndexes, 
CTableMap> {
 
+       private static class CTableFunction implements 
PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes, MatrixBlock[]>>, 
MatrixIndexes, MatrixBlock> 
+       {
                private static final long serialVersionUID = 
5348127596473232337L;
 
-               Ctable.OperationTypes ctableOp;
-               double scalar_input2; double scalar_input3;
-               String instString;
-               Operator optr;
-               boolean ignoreZeros;
+               private final Ctable.OperationTypes _ctableOp;
+               private final double _scalar_input2, _scalar_input3;
+               private final boolean _ignoreZeros;
+               private final long _dim1, _dim2;
+               private final int _brlen, _bclen;
                
-               public PerformCTableMapSideOperation(Ctable.OperationTypes 
ctableOp, double scalar_input2, double scalar_input3, String instString, 
Operator optr, boolean ignoreZeros) {
-                       this.ctableOp = ctableOp;
-                       this.scalar_input2 = scalar_input2;
-                       this.scalar_input3 = scalar_input3;
-                       this.instString = instString;
-                       this.optr = optr;
-                       this.ignoreZeros = ignoreZeros;
+               public CTableFunction(Ctable.OperationTypes ctableOp, double 
s2, double s3, boolean ignoreZeros, MatrixCharacteristics mcOut) {
+                       this(ctableOp, s2, s3, ignoreZeros, false, mcOut);
                }
                
-               private static void expectedALSize(int length, 
ArrayList<MatrixBlock> al) throws Exception {
-                       if(al.size() != length) {
-                               throw new Exception("Expected arraylist of 
size:" + length + ", but found " + al.size());
-                       }
+               public CTableFunction(Ctable.OperationTypes ctableOp, double 
s2, double s3, boolean ignoreZeros, boolean emitEmpty, MatrixCharacteristics 
mcOut) {
+                       _ctableOp = ctableOp;
+                       _scalar_input2 = s2;
+                       _scalar_input3 = s3;
+                       _ignoreZeros = ignoreZeros;
+                       _dim1 = mcOut.getRows();
+                       _dim2 = mcOut.getCols();
+                       _brlen = mcOut.getRowsPerBlock();
+                       _bclen = mcOut.getColsPerBlock();
                }
                
                @Override
-               public Tuple2<MatrixIndexes, CTableMap> call(
-                               Tuple2<MatrixIndexes, ArrayList<MatrixBlock>> 
kv) throws Exception {
-                       CTableMap ctableResult = new CTableMap();
-                       MatrixBlock ctableResultBlock = null;
-                       
-                       IndexedMatrixValue in1, in2, in3 = null;
-                       in1 = new IndexedMatrixValue(kv._1, kv._2.get(0));
-                       MatrixBlock matBlock1 = kv._2.get(0);
+               public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Iterator<Tuple2<MatrixIndexes, MatrixBlock[]>> arg0)
+                       throws Exception
+               {
+                       CTableMap map = new CTableMap(); MatrixBlock block = 
null;
                        
-                       switch( ctableOp )
-                       {
-                               case CTABLE_TRANSFORM: {
-                                       in2 = new IndexedMatrixValue(kv._1, 
kv._2.get(1));
-                                       in3 = new IndexedMatrixValue(kv._1, 
kv._2.get(2));
-                                       expectedALSize(3, kv._2);
-                                       
-                                       if(in1==null || in2==null || in3 == 
null )
-                                               break;  
-                                       else
-                                               
OperationsOnMatrixValues.performCtable(in1.getIndexes(), in1.getValue(), 
in2.getIndexes(),
-                                                       in2.getValue(), 
in3.getIndexes(), in3.getValue(), ctableResult, ctableResultBlock, optr);
-                                       break;
-                               }
-                               case CTABLE_TRANSFORM_SCALAR_WEIGHT: 
-                               case CTABLE_EXPAND_SCALAR_WEIGHT:
-                               {
-                                       // 3rd input is a scalar
-                                       in2 = new IndexedMatrixValue(kv._1, 
kv._2.get(1));
-                                       expectedALSize(2, kv._2);
-                                       if(in1==null || in2==null )
+                       //local aggregation of entire partition
+                       while( arg0.hasNext() ) {
+                               Tuple2<MatrixIndexes,MatrixBlock[]> tmp = 
arg0.next();
+                               MatrixIndexes ix = tmp._1();
+                               MatrixBlock[] mb = tmp._2();
+                               
+                               switch( _ctableOp ) {
+                                       case CTABLE_TRANSFORM: {
+                                               
OperationsOnMatrixValues.performCtable(ix, mb[0], ix,
+                                                       mb[1], ix, mb[2], map, 
block, null);
                                                break;
-                                       else
-                                               
matBlock1.ctableOperations((SimpleOperator)optr, kv._2.get(1), scalar_input3, 
ignoreZeros, ctableResult, ctableResultBlock);
+                                       }
+                                       case CTABLE_EXPAND_SCALAR_WEIGHT:
+                                       case CTABLE_TRANSFORM_SCALAR_WEIGHT: {
+                                               // 3rd input is a scalar
+                                               mb[0].ctableOperations(null, 
mb[1], _scalar_input3, _ignoreZeros, map, block);
                                                break;
-                               }
-                               case CTABLE_TRANSFORM_HISTOGRAM: {
-                                       expectedALSize(1, kv._2);
-                                       
OperationsOnMatrixValues.performCtable(in1.getIndexes(), in1.getValue(), 
scalar_input2, 
-                                                       scalar_input3, 
ctableResult, ctableResultBlock, optr);
-                                       break;
-                               }
-                               case CTABLE_TRANSFORM_WEIGHTED_HISTOGRAM: {
-                                       // 2nd and 3rd inputs are scalars
-                                       expectedALSize(2, kv._2);
-                                       in3 = new IndexedMatrixValue(kv._1, 
kv._2.get(1)); // Note: kv._2.get(1), not kv._2.get(2)
-                                       
-                                       if(in1==null || in3==null)
+                                       }
+                                       case CTABLE_TRANSFORM_HISTOGRAM: {
+                                               
OperationsOnMatrixValues.performCtable(ix, mb[0],
+                                                       _scalar_input2, 
_scalar_input3, map, block, null);
                                                break;
-                                       else
-                                               
OperationsOnMatrixValues.performCtable(in1.getIndexes(), in1.getValue(), 
scalar_input2, 
-                                                               
in3.getIndexes(), in3.getValue(), ctableResult, ctableResultBlock, optr);       
        
-                                       break;
+                                       }
+                                       case 
CTABLE_TRANSFORM_WEIGHTED_HISTOGRAM: {
+                                               // 2nd and 3rd inputs are 
scalars
+                                               
OperationsOnMatrixValues.performCtable(ix, mb[0],
+                                                       _scalar_input2, ix, 
mb[1], map, block, null);
+                                               break;
+                                       }
+                                       default:
+                                               break;
+                               }
+                       }
+                       
+                       ReblockBuffer rbuff = new ReblockBuffer(Math.min(
+                               4*1024*1024, map.size()), _dim1, _dim2, _brlen, 
_bclen);
+                       ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new 
ArrayList<>();
+                       
+                       //append to buffer for blocked output
+                       Iterator<ADoubleEntry> iter = map.getIterator();
+                       while( iter.hasNext() ) {
+                               ADoubleEntry e = iter.next();
+                               if( e.getKey1() <= _dim1 && e.getKey2() <= 
_dim2 ) { 
+                                       if( rbuff.getSize() >= 
rbuff.getCapacity() )
+                                               flushBufferToList(rbuff, ret);
+                                       rbuff.appendCell(e.getKey1(), 
e.getKey2(), e.value);
                                }
-                               default:
-                                       throw new 
DMLRuntimeException("Unrecognized opcode in Tertiary Instruction: " + 
instString);
                        }
-                       return new Tuple2<>(kv._1, ctableResult);
+                       
+                       //final flush buffer
+                       if( rbuff.getSize() > 0 )
+                               flushBufferToList(rbuff, ret);
+                       
+                       return ret.iterator();
                }
-               
-       }
        
-       private static class MapMBIntoAL implements 
PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, 
ArrayList<MatrixBlock>> {
-
-               private static final long serialVersionUID = 
2068398913653350125L;
-
-               @Override
-               public Tuple2<MatrixIndexes, ArrayList<MatrixBlock>> call(
-                               Tuple2<MatrixIndexes, MatrixBlock> kv) throws 
Exception {
-                       ArrayList<MatrixBlock> retVal = new ArrayList<>();
-                       retVal.add(kv._2);
-                       return new Tuple2<>(kv._1, retVal);
+               protected void flushBufferToList( ReblockBuffer rbuff,  
ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret ) 
+                       throws IOException, DMLRuntimeException
+               {
+                       rbuff.flushBufferToBinaryBlocks().stream() // prevent 
library dependencies
+                               .map(b -> 
SparkUtils.fromIndexedMatrixBlock(b)).forEach(b -> ret.add(b));
                }
-               
        }
        
-       private static class ExtractBinaryCellsFromCTable implements 
PairFlatMapFunction<CTableMap, MatrixIndexes, Double> {
+       public static class MapJoinSignature1 implements Function<MatrixBlock, 
MatrixBlock[]> {
+               private static final long serialVersionUID = 
-8819908424033945028L;
 
-               private static final long serialVersionUID = 
-5933677686766674444L;
-               
                @Override
-               public Iterator<Tuple2<MatrixIndexes, Double>> call(CTableMap 
ctableMap)
-                               throws Exception {
-                       ArrayList<Tuple2<MatrixIndexes, Double>> retVal = new 
ArrayList<>();
-                       Iterator<ADoubleEntry> iter = ctableMap.getIterator();
-                       while( iter.hasNext() ) {
-                               ADoubleEntry ijv = iter.next();
-                               long i = ijv.getKey1();
-                               long j =  ijv.getKey2();
-                               double v =  ijv.value;
-                               retVal.add(new Tuple2<>(new MatrixIndexes(i, 
j), v));
-                       }
-                       return retVal.iterator();
+               public MatrixBlock[] call(MatrixBlock v1) throws Exception {
+                       return ArrayUtils.toArray(v1);
                }
-               
        }
        
-       private static class ConvertToBinaryCell implements 
PairFunction<Tuple2<MatrixIndexes,Double>, MatrixIndexes, MatrixCell> {
-
-               private static final long serialVersionUID = 
7481186480851982800L;
-               
+       public static class MapJoinSignature2 implements 
Function<Tuple2<MatrixBlock,MatrixBlock>, MatrixBlock[]> {
+               private static final long serialVersionUID = 
7690448020081435520L;
                @Override
-               public Tuple2<MatrixIndexes, MatrixCell> call(
-                               Tuple2<MatrixIndexes, Double> kv) throws 
Exception {
-                       
-                       MatrixCell cell = new MatrixCell(kv._2().doubleValue());
-                       return new Tuple2<>(kv._1(), cell);
+               public MatrixBlock[] call(Tuple2<MatrixBlock, MatrixBlock> v1) 
throws Exception {
+                       return ArrayUtils.toArray(v1._1(), v1._2());
                }
-               
        }
        
-       private static class FilterCells implements 
Function<Tuple2<MatrixIndexes,Double>, Boolean> {
-               private static final long serialVersionUID = 
108448577697623247L;
-
-               long rlen; long clen;
-               public FilterCells(long rlen, long clen) {
-                       this.rlen = rlen;
-                       this.clen = clen;
-               }
-               
+       public static class MapJoinSignature3 implements 
Function<Tuple2<Tuple2<MatrixBlock,MatrixBlock>,MatrixBlock>, MatrixBlock[]> {
+               private static final long serialVersionUID = 
-5222678882354280164L;
                @Override
-               public Boolean call(Tuple2<MatrixIndexes, Double> kv) throws 
Exception {
-                       if(kv._1.getRowIndex() <= 0 || kv._1.getColumnIndex() 
<= 0) {
-                               throw new Exception("Incorrect cell values in 
TernarySPInstruction:" + kv._1);
-                       }
-                       if(kv._1.getRowIndex() <= rlen && 
kv._1.getColumnIndex() <= clen) {
-                               return true;
-                       }
-                       return false;
+               public MatrixBlock[] call(Tuple2<Tuple2<MatrixBlock, 
MatrixBlock>, MatrixBlock> v1) throws Exception {
+                       return ArrayUtils.toArray(v1._1()._1(), v1._1()._2(), 
v1._2());
                }
-               
        }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index 6e647ee..29cd567 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -65,7 +65,6 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixCell;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
-import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.mapred.ReblockBuffer;
 import org.apache.sysml.runtime.util.DataConverter;
 import org.apache.sysml.runtime.util.FastStringTokenizer;
@@ -491,10 +490,8 @@ public class RDDConverterUtils
                protected void flushBufferToList( ReblockBuffer rbuff,  
ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret ) 
                        throws IOException, DMLRuntimeException
                {
-                       //temporary list of indexed matrix values to prevent 
library dependencies
-                       ArrayList<IndexedMatrixValue> rettmp = new 
ArrayList<>();
-                       rbuff.flushBufferToBinaryBlocks(rettmp);
-                       ret.addAll(SparkUtils.fromIndexedMatrixBlock(rettmp));
+                       rbuff.flushBufferToBinaryBlocks().stream() // prevent 
library dependencies
+                               .map(b -> 
SparkUtils.fromIndexedMatrixBlock(b)).forEach(b -> ret.add(b));
                }
        }
 
@@ -574,11 +571,11 @@ public class RDDConverterUtils
        /////////////////////////////////
        // BINARYCELL-SPECIFIC FUNCTIONS
 
-       private static class BinaryCellToBinaryBlockFunction extends 
CellToBinaryBlockFunction implements 
PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes,MatrixCell>>,MatrixIndexes,MatrixBlock>
 
+       public static class BinaryCellToBinaryBlockFunction extends 
CellToBinaryBlockFunction implements 
PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes,MatrixCell>>,MatrixIndexes,MatrixBlock>
 
        {
                private static final long serialVersionUID = 
3928810989462198243L;
 
-               protected BinaryCellToBinaryBlockFunction(MatrixCharacteristics 
mc) {
+               public BinaryCellToBinaryBlockFunction(MatrixCharacteristics 
mc) {
                        super(mc);
                }
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index 77800e4..4871aee 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -389,10 +389,8 @@ public class RDDConverterUtilsExt
                private static void flushBufferToList( ReblockBuffer rbuff,  
ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret )
                        throws IOException, DMLRuntimeException
                {
-                       //temporary list of indexed matrix values to prevent 
library dependencies
-                       ArrayList<IndexedMatrixValue> rettmp = new 
ArrayList<IndexedMatrixValue>();
-                       rbuff.flushBufferToBinaryBlocks(rettmp);
-                       ret.addAll(SparkUtils.fromIndexedMatrixBlock(rettmp));
+                       rbuff.flushBufferToBinaryBlocks().stream() // prevent 
library dependencies
+                               .map(b -> 
SparkUtils.fromIndexedMatrixBlock(b)).forEach(b -> ret.add(b));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixPackedCell.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixPackedCell.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixPackedCell.java
index ecf44b6..e8d0316 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixPackedCell.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixPackedCell.java
@@ -73,7 +73,7 @@ public class MatrixPackedCell extends MatrixCell
        //with corrections
        @Override
        public void incrementalAggregate(AggregateOperator aggOp, MatrixValue 
correction, 
-                       MatrixValue newWithCorrection) {
+                       MatrixValue newWithCorrection, boolean deep) {
                incrementalAggregate(aggOp, newWithCorrection);
        }
        

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixValue.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixValue.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixValue.java
index 82b09e0..88a918d 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixValue.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixValue.java
@@ -127,10 +127,6 @@ public abstract class MatrixValue implements 
WritableComparable
        
        public abstract MatrixValue unaryOperations(UnaryOperator op, 
MatrixValue result);
        
-       public void incrementalAggregate(AggregateOperator aggOp, MatrixValue 
correction, MatrixValue newWithCorrection) {
-               incrementalAggregate(aggOp, correction, newWithCorrection, 
true);
-       }
-       
        public abstract void incrementalAggregate(AggregateOperator aggOp, 
MatrixValue correction, MatrixValue newWithCorrection, boolean deep);
        
        public abstract void incrementalAggregate(AggregateOperator aggOp, 
MatrixValue newWithCorrection);

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java
index 7f273fb..8d6f2e6 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java
@@ -23,8 +23,10 @@ package org.apache.sysml.runtime.matrix.mapred;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -223,11 +225,11 @@ public class ReblockBuffer
                _count = 0;
        }
 
-       public void flushBufferToBinaryBlocks( ArrayList<IndexedMatrixValue> 
outList ) 
+       public List<IndexedMatrixValue> flushBufferToBinaryBlocks() 
                throws IOException, DMLRuntimeException
        {
                if( _count == 0 )
-                       return;
+                       return Collections.emptyList();
                
                //Step 1) sort reblock buffer (blockwise, no in-block sorting!)
                Arrays.sort( _buff, 0 ,_count, new ReblockBufferComparator() );
@@ -248,7 +250,8 @@ public class ReblockBuffer
                        }
                }
                
-               //Step 3) output blocks 
+               //Step 3) output blocks
+               ArrayList<IndexedMatrixValue> ret = new ArrayList<>();
                boolean sparse = MatrixBlock.evalSparseFormatInMemory(_brlen, 
_bclen, _count/numBlocks);
                MatrixIndexes tmpIx = new MatrixIndexes();
                MatrixBlock tmpBlock = new MatrixBlock();
@@ -262,7 +265,7 @@ public class ReblockBuffer
                        
                        //output block and switch to next index pair
                        if( bi != cbi || bj != cbj ) {
-                               outputBlock(outList, tmpIx, tmpBlock);
+                               outputBlock(ret, tmpIx, tmpBlock);
                                cbi = bi;
                                cbj = bj;
                                tmpIx = new MatrixIndexes(bi, bj);
@@ -278,9 +281,9 @@ public class ReblockBuffer
                }
                
                //output last block 
-               outputBlock(outList, tmpIx, tmpBlock);
-               
+               outputBlock(ret, tmpIx, tmpBlock);
                _count = 0;
+               return ret;
        }
 
        private static void outputBlock( OutputCollector<Writable, Writable> 
out, MatrixIndexes key, TaggedAdaptivePartialBlock value, MatrixBlock block ) 

http://git-wip-us.apache.org/repos/asf/systemml/blob/b3fef523/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java 
b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index f0911bb..1947c00 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -22,8 +22,11 @@ package org.apache.sysml.runtime.util;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.sysml.parser.Expression.ValueType;
@@ -663,4 +666,9 @@ public class UtilFunctions
                        ret.add(element);
                return ret;
        }
+       
+       public static <T> Stream<T> getStream(Iterator<T> iter) {
+               Iterable<T> iterable = () -> iter;
+               return StreamSupport.stream(iterable.spliterator(), false);
+       }
 }

Reply via email to