[SYSTEMML-946] Improved shuffle-based data converters (avoid alloc)

This patch contains two improvements of our shuffle-based data
converters, which both avoid unnecessary allocations and block copies
and hence reduce GC pressure and generally improve performance. 

(1) Row sparsity estimates on dataframe - matrix conversion, which
affects the grow rate of sparse rows (2x vs 1.1x, and hence reduce
allocations)

(2) Improved mergeByKey primitive: we now use combineByKey instead of
reduceByKey to allocate a combiner block once and merge subsequent
blocks in-place without unnecessary allocation and copy. This applies to
all operations which rely on merging partial blocks. On a scenario of
csv-matrix conversion (1k x 5M, 4 nodes), this improved performance from
1,214s to 443s.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/38d087a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/38d087a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/38d087a7

Branch: refs/heads/master
Commit: 38d087a767a23c5f43a2a1ab45d69bf9bc1b1934
Parents: 80a72d7
Author: Matthias Boehm <mbo...@us.ibm.com>
Authored: Wed Sep 21 17:26:22 2016 -0700
Committer: Matthias Boehm <mbo...@us.ibm.com>
Committed: Wed Sep 21 17:26:22 2016 -0700

----------------------------------------------------------------------
 .../spark/utils/RDDAggregateUtils.java          | 132 ++++++-------------
 .../spark/utils/RDDConverterUtils.java          |   4 +-
 2 files changed, 46 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/38d087a7/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
index c545c30..93bb1d0 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -100,7 +100,7 @@ public class RDDAggregateUtils
        {
                //stable sum of blocks per key, by passing correction blocks 
along with aggregates              
                JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp = 
-                               in.combineByKey( new 
CreateBlockCombinerFunction(), 
+                               in.combineByKey( new 
CreateCorrBlockCombinerFunction(), 
                                                             new 
MergeSumBlockValueFunction(), 
                                                             new 
MergeSumBlockCombinerFunction() );
                
@@ -117,28 +117,6 @@ public class RDDAggregateUtils
         * @param in
         * @return
         */
-//     public static JavaPairRDD<MatrixIndexes, MatrixBlock> sumByKeyStable( 
MatrixCharacteristics mc, JavaPairRDD<MatrixIndexes, MatrixBlock> in )
-//     {
-//             //stable sum of blocks per key, by passing correction blocks 
along with aggregates              
-//             JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp = 
-//                             in.combineByKey( new 
CreateBlockCombinerFunction(), 
-//                                                          new 
MergeSumBlockValueFunction(), 
-//                                                          new 
MergeSumBlockCombinerFunction(),
-//                                                          new 
BlockPartitioner(mc, in.partitions().size()));
-//             
-//             //strip-off correction blocks from                              
             
-//             JavaPairRDD<MatrixIndexes, MatrixBlock> out =  
-//                             tmp.mapValues( new ExtractMatrixBlock() );
-//             
-//             //return the aggregate rdd
-//             return out;
-//     }
-       
-       /**
-        * 
-        * @param in
-        * @return
-        */
        public static JavaPairRDD<MatrixIndexes, Double> sumCellsByKeyStable( 
JavaPairRDD<MatrixIndexes, Double> in )
        {
                //stable sum of blocks per key, by passing correction blocks 
along with aggregates              
@@ -191,7 +169,7 @@ public class RDDAggregateUtils
        {
                //stable sum of blocks per key, by passing correction blocks 
along with aggregates              
                JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp = 
-                               in.combineByKey( new 
CreateBlockCombinerFunction(), 
+                               in.combineByKey( new 
CreateCorrBlockCombinerFunction(), 
                                                             new 
MergeAggBlockValueFunction(aop), 
                                                             new 
MergeAggBlockCombinerFunction(aop) );
                
@@ -204,30 +182,6 @@ public class RDDAggregateUtils
        }
        
        /**
-        * 
-        * @param mc
-        * @param in
-        * @param aop
-        * @return
-        */
-//     public static JavaPairRDD<MatrixIndexes, MatrixBlock> aggByKeyStable( 
MatrixCharacteristics mc, JavaPairRDD<MatrixIndexes, MatrixBlock> in, 
AggregateOperator aop )
-//     {
-//             //stable sum of blocks per key, by passing correction blocks 
along with aggregates              
-//             JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp = 
-//                             in.combineByKey( new 
CreateBlockCombinerFunction(), 
-//                                                          new 
MergeAggBlockValueFunction(aop), 
-//                                                          new 
MergeAggBlockCombinerFunction(aop),
-//                                                          new 
BlockPartitioner(mc, in.partitions().size()));
-//             
-//             //strip-off correction blocks from                              
             
-//             JavaPairRDD<MatrixIndexes, MatrixBlock> out =  
-//                             tmp.mapValues( new ExtractMatrixBlock() );
-//             
-//             //return the aggregate rdd
-//             return out;
-//     }
-       
-       /**
         * Merges disjoint data of all blocks per key.
         * 
         * Note: The behavior of this method is undefined for both sparse and 
dense data if the 
@@ -238,24 +192,14 @@ public class RDDAggregateUtils
         */
        public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( 
JavaPairRDD<MatrixIndexes, MatrixBlock> in )
        {
-               return in.reduceByKey(
-                               new MergeBlocksFunction());
+               //use combine by key to avoid unnecessary deep block copies, 
i.e.
+               //create combiner block once and merge remaining blocks 
in-place.
+               return in.combineByKey( new CreateBlockCombinerFunction(), 
+                           new MergeBlocksFunction(false), 
+                           new MergeBlocksFunction(false) );
        }
        
        /**
-        * 
-        * @param mc
-        * @param in
-        * @return
-        */
-//     public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( 
MatrixCharacteristics mc, JavaPairRDD<MatrixIndexes, MatrixBlock> in )
-//     {
-//             return in.reduceByKey(
-//                             new BlockPartitioner(mc, 
in.partitions().size()),
-//                             new MergeBlocksFunction());
-//     }
-       
-       /**
         * Merges disjoint data of all blocks per key.
         * 
         * Note: The behavior of this method is undefined for both sparse and 
dense data if the 
@@ -268,13 +212,13 @@ public class RDDAggregateUtils
        {
                return in.combineByKey( new CreateRowBlockCombinerFunction(), 
                                                            new 
MergeRowBlockValueFunction(), 
-                                                           new 
MergeRowBlockCombinerFunction() );
+                                                           new 
MergeBlocksFunction(false) );
        }
        
        /**
         * 
         */
-       private static class CreateBlockCombinerFunction implements 
Function<MatrixBlock, CorrMatrixBlock> 
+       private static class CreateCorrBlockCombinerFunction implements 
Function<MatrixBlock, CorrMatrixBlock> 
        {
                private static final long serialVersionUID = 
-3666451526776017343L;
 
@@ -349,6 +293,22 @@ public class RDDAggregateUtils
        /**
         *
         */
+       private static class CreateBlockCombinerFunction implements 
Function<MatrixBlock, MatrixBlock> 
+       {
+               private static final long serialVersionUID = 
1987501624176848292L;
+
+               @Override
+               public MatrixBlock call(MatrixBlock arg0) 
+                       throws Exception 
+               {
+                       //create deep copy of given block
+                       return new MatrixBlock(arg0);
+               }       
+       }
+       
+       /**
+        *
+        */
        private static class CreateRowBlockCombinerFunction implements 
Function<RowMatrixBlock, MatrixBlock> 
        {
                private static final long serialVersionUID = 
2866598914232118425L;
@@ -392,26 +352,6 @@ public class RDDAggregateUtils
        /**
         * 
         */
-       private static class MergeRowBlockCombinerFunction implements 
Function2<MatrixBlock, MatrixBlock, MatrixBlock> 
-       {
-               private static final long serialVersionUID = 
5142967296705548000L;
-
-               @Override
-               public MatrixBlock call(MatrixBlock arg0, MatrixBlock arg1) 
-                       throws Exception 
-               {
-                       //merge second matrix block into first
-                       MatrixBlock out = arg0; //in-place update
-                       out.merge(arg1, false);
-                       out.examSparsity();
-                       
-                       return out;
-               }       
-       }
-       
-       /**
-        * 
-        */
        private static class CreateCellCombinerFunction implements 
Function<Double, KahanObject> 
        {
                private static final long serialVersionUID = 
3697505233057172994L;
@@ -736,11 +676,25 @@ public class RDDAggregateUtils
        private static class MergeBlocksFunction implements 
Function2<MatrixBlock, MatrixBlock, MatrixBlock> 
        {               
                private static final long serialVersionUID = 
-8881019027250258850L;
-
+               private boolean _deep = false;
+               
+               @SuppressWarnings("unused")
+               public MergeBlocksFunction() {
+                       //by default deep copy first argument
+                       this(true); 
+               }
+               
+               public MergeBlocksFunction(boolean deep) {
+                       _deep = deep;
+               }
+               
                @Override
                public MatrixBlock call(MatrixBlock b1, MatrixBlock b2) 
                        throws Exception 
                {
+                       long b1nnz = b1.getNonZeros();
+                       long b2nnz = b2.getNonZeros();
+                       
                        // sanity check input dimensions
                        if (b1.getNumRows() != b2.getNumRows() || 
b1.getNumColumns() != b2.getNumColumns()) {
                                throw new DMLRuntimeException("Mismatched block 
sizes for: "
@@ -749,14 +703,14 @@ public class RDDAggregateUtils
                        }
 
                        // execute merge (never pass by reference)
-                       MatrixBlock ret = new MatrixBlock(b1);
+                       MatrixBlock ret = _deep ? new MatrixBlock(b1) : b1;
                        ret.merge(b2, false);
                        ret.examSparsity();
                        
                        // sanity check output number of non-zeros
-                       if (ret.getNonZeros() != b1.getNonZeros() + 
b2.getNonZeros()) {
+                       if (ret.getNonZeros() != b1nnz + b2nnz) {
                                throw new DMLRuntimeException("Number of 
non-zeros does not match: "
-                                               + ret.getNonZeros() + " != " + 
b1.getNonZeros() + " + " + b2.getNonZeros());
+                                               + ret.getNonZeros() + " != " + 
b1nnz + " + " + b2nnz);
                        }
 
                        return ret;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/38d087a7/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index a619a4d..38ebd7e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -899,6 +899,7 @@ public class RDDConverterUtils
                private long _clen = -1;
                private int _brlen = -1;
                private int _bclen = -1;
+               private double _sparsity = 1.0;
                private boolean _sparse = false;
                private boolean _containsID;
                private boolean _isVector;
@@ -908,6 +909,7 @@ public class RDDConverterUtils
                        _clen = mc.getCols();
                        _brlen = mc.getRowsPerBlock();
                        _bclen = mc.getColsPerBlock();
+                       _sparsity = OptimizerUtils.getSparsity(mc);
                        _sparse = sparse;
                        _containsID = containsID;
                        _isVector = isVector;
@@ -976,7 +978,7 @@ public class RDDConverterUtils
                        for( int cix=1; cix<=ncblks; cix++ ) {
                                int lclen = 
(int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);                        
    
                                ix[cix-1] = new MatrixIndexes(rix, cix);
-                               mb[cix-1] = new MatrixBlock(lrlen, lclen, 
_sparse);             
+                               mb[cix-1] = new MatrixBlock(lrlen, lclen, 
_sparse,(int)(lrlen*lclen*_sparsity));
                        }
                }
                

Reply via email to