Repository: incubator-systemml
Updated Branches:
  refs/heads/master 5baac2d62 -> f32bd8ebd


[SYSTEMML-1317] Avoid unnecessary copies on creating combiner blocks

For most aggregations (e.g., merge of partial blocks, or aggregate by
key), we use combineByKey to allow for update-in-place when adding to
combiner blocks. However, so far we deep copy - often unnecessarily -
blocks when creating the combiner blocks. This creates unnecessary
memory pressure in scenarios where there is almost no aggregation. We
now exploit the knowledge of temporary blocks to allow aggregations with
shallow copy to reduce garbage collection overhead and memory pressure. 


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/9919a6c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/9919a6c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/9919a6c6

Branch: refs/heads/master
Commit: 9919a6c61a288267805f406cb5c1e1030ddf0a20
Parents: 5baac2d
Author: Matthias Boehm <[email protected]>
Authored: Wed Mar 15 18:53:09 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Thu Mar 16 11:30:41 2017 -0700

----------------------------------------------------------------------
 .../context/SparkExecutionContext.java          |  5 +-
 .../parfor/ResultMergeRemoteSpark.java          |  2 +-
 .../spark/AggregateTernarySPInstruction.java    |  2 +-
 .../spark/AggregateUnarySPInstruction.java      |  2 +-
 .../instructions/spark/CpmmSPInstruction.java   |  2 +-
 .../spark/CumulativeAggregateSPInstruction.java |  2 +-
 .../instructions/spark/MapmmSPInstruction.java  |  2 +-
 .../instructions/spark/PMapmmSPInstruction.java |  2 +-
 .../ParameterizedBuiltinSPInstruction.java      |  8 +--
 .../instructions/spark/PmmSPInstruction.java    |  2 +-
 .../spark/QuaternarySPInstruction.java          |  2 +-
 .../spark/ReblockSPInstruction.java             |  2 +-
 .../instructions/spark/ReorgSPInstruction.java  |  2 +-
 .../instructions/spark/RmmSPInstruction.java    |  2 +-
 .../instructions/spark/SpoofSPInstruction.java  |  8 +--
 .../instructions/spark/Tsmm2SPInstruction.java  |  2 +-
 .../spark/utils/FrameRDDConverterUtils.java     |  4 +-
 .../spark/utils/RDDAggregateUtils.java          | 57 ++++++++++++++++----
 .../spark/utils/RDDConverterUtils.java          |  4 +-
 .../spark/utils/RDDConverterUtilsExt.java       |  2 +-
 .../instructions/spark/utils/RDDSortUtils.java  |  8 +--
 .../java/org/apache/sysml/utils/Statistics.java |  6 ++-
 22 files changed, 84 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index faababe..648c51e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -1185,8 +1185,9 @@ public class SparkExecutionContext extends 
ExecutionContext
                                in = in.coalesce( numPartitions );
                }
                
-               //repartition rdd (force creation of shuffled rdd via merge)
-               JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
RDDAggregateUtils.mergeByKey(in);
+               //repartition rdd (force creation of shuffled rdd via merge), 
note: without deep copy albeit 
+               //executed on the original data, because there will be no 
merge, i.e., no key duplicates
+               JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
RDDAggregateUtils.mergeByKey(in, false);
                
                //convert mcsr into memory-efficient csr if potentially sparse
                if( OptimizerUtils.checkSparseBlockCSRConversion(mcIn) ) {      
                        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
index ef1ae02..49293e4 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
@@ -162,7 +162,7 @@ public class ResultMergeRemoteSpark extends ResultMerge
                    else
                    {
                        //direct merge in any order (disjointness guaranteed)
-                       out = RDDAggregateUtils.mergeByKey(rdd);
+                       out = RDDAggregateUtils.mergeByKey(rdd, false);
                    }
                    
                    //Step 3: create output rdd handle w/ lineage

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java
index 2a305be..6a494b7 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateTernarySPInstruction.java
@@ -116,7 +116,7 @@ public class AggregateTernarySPInstruction extends 
ComputationSPInstruction
                else //tack+* multi block
                {
                        //multi-block aggregation and drop correction
-                       out = RDDAggregateUtils.aggByKeyStable(out, 
aggop.aggOp);
+                       out = RDDAggregateUtils.aggByKeyStable(out, 
aggop.aggOp, false);
                        out = out.mapValues( new 
AggregateDropCorrectionFunction(aggop.aggOp) );
 
                        //put output RDD handle into symbol table

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java
index eb9324f..73f67a3 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/AggregateUnarySPInstruction.java
@@ -118,7 +118,7 @@ public class AggregateUnarySPInstruction extends 
UnarySPInstruction
                        else if( _aggtype == SparkAggType.MULTI_BLOCK ) {
                                //in case of multi-block aggregation, we always 
keep the correction
                                out = out.mapToPair(new RDDUAggFunction(auop, 
mc.getRowsPerBlock(), mc.getColsPerBlock()));                     
-                               out = RDDAggregateUtils.aggByKeyStable(out, 
aggop);
+                               out = RDDAggregateUtils.aggByKeyStable(out, 
aggop, false);
        
                                //drop correction after aggregation if required 
(aggbykey creates 
                                //partitioning, drop correction via 
partitioning-preserving mapvalues)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
index 4227320..4a29c5e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CpmmSPInstruction.java
@@ -111,7 +111,7 @@ public class CpmmSPInstruction extends BinarySPInstruction
                }
                else //DEFAULT: MULTI_BLOCK
                {
-                       out = RDDAggregateUtils.sumByKeyStable(out); 
+                       out = RDDAggregateUtils.sumByKeyStable(out, false); 
                        
                        //put output RDD handle into symbol table
                        sec.setRDDHandleForVariable(output.getName(), out);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
index e0cad17..9dd81aa 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeAggregateSPInstruction.java
@@ -78,7 +78,7 @@ public class CumulativeAggregateSPInstruction extends 
AggregateUnarySPInstructio
                AggregateUnaryOperator auop = (AggregateUnaryOperator) _optr;
                JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
                                in.mapToPair(new RDDCumAggFunction(auop, rlen, 
brlen, bclen));
-               out = RDDAggregateUtils.mergeByKey(out);
+               out = RDDAggregateUtils.mergeByKey(out, false);
                
                //put output handle in symbol table
                sec.setRDDHandleForVariable(output.getName(), out);     

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
index 33eacbe..5baffb0 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
@@ -149,7 +149,7 @@ public class MapmmSPInstruction extends BinarySPInstruction
                                out = out.filter(new 
FilterNonEmptyBlocksFunction());
                        
                        if( _aggtype == SparkAggType.MULTI_BLOCK )
-                               out = RDDAggregateUtils.sumByKeyStable(out);
+                               out = RDDAggregateUtils.sumByKeyStable(out, 
false);
                
                        //put output RDD handle into symbol table
                        sec.setRDDHandleForVariable(output.getName(), out);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
index ee2cd1e..eceeabc 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/PMapmmSPInstruction.java
@@ -120,7 +120,7 @@ public class PMapmmSPInstruction extends BinarySPInstruction
                        //matrix multiplication
                        JavaPairRDD<MatrixIndexes,MatrixBlock> rdd2 = in2
                                        .flatMapToPair(new PMapMMFunction(bpmb, 
i/mc1.getRowsPerBlock()));
-                       rdd2 = RDDAggregateUtils.sumByKeyStable(rdd2);
+                       rdd2 = RDDAggregateUtils.sumByKeyStable(rdd2, false);
                        rdd2.persist(pmapmmStorageLevel)
                            .count();
                        bpmb.unpersist(false);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index d7f4f7d..5c27f60 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -225,7 +225,7 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                                                target.flatMapToPair(new 
RDDMapGroupedAggFunction(groups, _optr, 
                                                                ngroups, 
mc1.getRowsPerBlock(), mc1.getColsPerBlock()));
                                
-                               out = RDDAggregateUtils.sumByKeyStable(out);
+                               out = RDDAggregateUtils.sumByKeyStable(out, 
false);
                                
                                //updated characteristics and handle outputs
                                mcOut.set(ngroups, mc1.getCols(), 
mc1.getRowsPerBlock(), mc1.getColsPerBlock(), -1);
@@ -355,7 +355,7 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                                                .flatMapToPair(new 
RDDRemoveEmptyFunction(rows, maxDim, brlen, bclen));         
                                }                               
        
-                               out = RDDAggregateUtils.mergeByKey(out);
+                               out = RDDAggregateUtils.mergeByKey(out, false);
                                
                                //store output rdd handle
                                sec.setRDDHandleForVariable(output.getName(), 
out);
@@ -414,7 +414,7 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                        //execute remove empty rows/cols operation
                        JavaPairRDD<MatrixIndexes,MatrixBlock> out = in
                                        .flatMapToPair(new 
RDDRExpandFunction(maxVal, dirRows, cast, ignore, brlen, bclen));            
-                       out = RDDAggregateUtils.mergeByKey(out);
+                       out = RDDAggregateUtils.mergeByKey(out, false);
                        
                        //store output rdd handle
                        sec.setRDDHandleForVariable(output.getName(), out);
@@ -484,7 +484,7 @@ public class ParameterizedBuiltinSPInstruction  extends 
ComputationSPInstruction
                        if( mc.getCols() > mc.getNumColBlocks() ) {
                                in = in.mapToPair(new 
RDDTransformDecodeExpandFunction(
                                                (int)mc.getCols(), 
mc.getColsPerBlock()));
-                               in = RDDAggregateUtils.mergeByKey(in);
+                               in = RDDAggregateUtils.mergeByKey(in, false);
                        }
                        
                        //construct decoder and decode individual matrix blocks

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java
index ce87c46..2e6b46e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/PmmSPInstruction.java
@@ -103,7 +103,7 @@ public class PmmSPInstruction extends BinarySPInstruction
                //execute pmm instruction
                JavaPairRDD<MatrixIndexes,MatrixBlock> out = in1
                                .flatMapToPair( new RDDPMMFunction(_type, in2, 
rlen, mc.getRowsPerBlock()) );
-               out = RDDAggregateUtils.sumByKeyStable(out);
+               out = RDDAggregateUtils.sumByKeyStable(out, false);
                
                //put output RDD handle into symbol table
                sec.setRDDHandleForVariable(output.getName(), out);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java
index 0ae8a67..7dfe4be 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuaternarySPInstruction.java
@@ -304,7 +304,7 @@ public class QuaternarySPInstruction extends 
ComputationSPInstruction
                {
                        //aggregation if required (map/redwdivmm)
                        if( qop.wtype3 != null && !qop.wtype3.isBasic() )
-                               out = RDDAggregateUtils.sumByKeyStable( out );
+                               out = RDDAggregateUtils.sumByKeyStable(out, 
false);
                                
                        //put output RDD handle into symbol table
                        sec.setRDDHandleForVariable(output.getName(), out);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java
index a324f9d..f99d47f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReblockSPInstruction.java
@@ -178,7 +178,7 @@ public class ReblockSPInstruction extends UnarySPInstruction
                        
                        JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
                                        in1.flatMapToPair(new 
ExtractBlockForBinaryReblock(mc, mcOut));
-                       out = RDDAggregateUtils.mergeByKey( out );
+                       out = RDDAggregateUtils.mergeByKey(out, false);
                        
                        //put output RDD handle into symbol table
                        sec.setRDDHandleForVariable(output.getName(), out);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java
index 62ffbce..ce1c584 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/ReorgSPInstruction.java
@@ -139,7 +139,7 @@ public class ReorgSPInstruction extends UnarySPInstruction
                        //execute reverse reorg operation
                        out = in1.flatMapToPair(new RDDRevFunction(mcIn));
                        if( mcIn.getRows() % mcIn.getRowsPerBlock() != 0 )
-                               out = RDDAggregateUtils.mergeByKey(out);
+                               out = RDDAggregateUtils.mergeByKey(out, false);
                }
                else if ( opcode.equalsIgnoreCase("rdiag") ) // DIAG
                {       

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
index e6b5755..1fe025a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/RmmSPInstruction.java
@@ -95,7 +95,7 @@ public class RmmSPInstruction extends BinarySPInstruction
                JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
                                tmp1.join( tmp2 )                              
//join by result block 
                            .mapToPair( new RmmMultiplyFunction() );   //do 
matrix multiplication
-               out = RDDAggregateUtils.sumByKeyStable(out);           
//aggregation per result block
+               out = RDDAggregateUtils.sumByKeyStable(out, false);    
//aggregation per result block
                
                //put output block into symbol table (no lineage because single 
block)
                updateBinaryMMOutputMatrixCharacteristics(sec, true);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
index c78613d..ca92b9b 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
@@ -123,9 +123,9 @@ public class SpoofSPInstruction extends SPInstruction
                                        //NOTE: workaround with partition size 
needed due to potential bug in SPARK
                                        //TODO investigate if some other side 
effect of correct blocks
                                        if( out.partitions().size() > 
mcIn.getNumRowBlocks() )
-                                               out = 
RDDAggregateUtils.sumByKeyStable(out, (int)mcIn.getNumRowBlocks());
+                                               out = 
RDDAggregateUtils.sumByKeyStable(out, (int)mcIn.getNumRowBlocks(), false);
                                        else
-                                               out = 
RDDAggregateUtils.sumByKeyStable(out);
+                                               out = 
RDDAggregateUtils.sumByKeyStable(out, false);
                                }
                                sec.setRDDHandleForVariable(_out.getName(), 
out);
                                
@@ -158,9 +158,9 @@ public class SpoofSPInstruction extends SPInstruction
                                        //NOTE: workaround with partition size 
needed due to potential bug in SPARK
                                        //TODO investigate if some other side 
effect of correct blocks
                                        if( in.partitions().size() > 
mcOut.getNumRowBlocks()*mcOut.getNumColBlocks() )
-                                               out = 
RDDAggregateUtils.sumByKeyStable( out, 
(int)(mcOut.getNumRowBlocks()*mcOut.getNumColBlocks()) );
+                                               out = 
RDDAggregateUtils.sumByKeyStable(out, 
(int)(mcOut.getNumRowBlocks()*mcOut.getNumColBlocks()), false);
                                        else
-                                               out = 
RDDAggregateUtils.sumByKeyStable( out );  
+                                               out = 
RDDAggregateUtils.sumByKeyStable(out, false);     
                                }
                                sec.setRDDHandleForVariable(_out.getName(), 
out);
                                

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
index da917b5..1f1b3e4 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/Tsmm2SPInstruction.java
@@ -119,7 +119,7 @@ public class Tsmm2SPInstruction extends UnarySPInstruction
                else {
                        //output individual output blocks and aggregate by key 
(no action)
                        JavaPairRDD<MatrixIndexes,MatrixBlock> tmp2 = 
in.flatMapToPair(new RDDTSMM2Function(bpmb, _type));
-                       JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
RDDAggregateUtils.sumByKeyStable(tmp2);
+                       JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
RDDAggregateUtils.sumByKeyStable(tmp2, false);
                        
                        //put output RDD handle into symbol table
                        
sec.getMatrixCharacteristics(output.getName()).set(outputDim, outputDim, 
mc.getRowsPerBlock(), mc.getColsPerBlock());

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 013a1a8..b736931 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -208,7 +208,7 @@ public class FrameRDDConverterUtils
                        
                        //shuffle matrix blocks (instead of frame blocks) in 
order to exploit 
                        //sparse formats (for sparse or wide matrices) during 
shuffle
-                       in = RDDAggregateUtils.mergeByKey(in);
+                       in = RDDAggregateUtils.mergeByKey(in, false);
                }
                        
                //convert individual matrix blocks to frame blocks (w/o shuffle)
@@ -223,7 +223,7 @@ public class FrameRDDConverterUtils
                                .flatMapToPair(new 
BinaryBlockToMatrixBlockFunction(mcIn, mcOut));
        
                //aggregate partial matrix blocks
-               return RDDAggregateUtils.mergeByKey( out );     
+               return RDDAggregateUtils.mergeByKey(out, false);        
        }
        
        //=====================================

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
index 2dfff74..2759f7f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -42,8 +42,7 @@ import 
org.apache.sysml.runtime.matrix.operators.AggregateOperator;
  * 
  */
 public class RDDAggregateUtils 
-{
-       
+{      
        //internal configuration to use tree aggregation (treeReduce w/ 
depth=2),
        //this is currently disabled because it was 2x slower than a simple
        //single-block reduce due to additional overhead for shuffling 
@@ -69,15 +68,21 @@ public class RDDAggregateUtils
                }
        }
 
-       public static JavaPairRDD<MatrixIndexes, MatrixBlock> sumByKeyStable( 
JavaPairRDD<MatrixIndexes, MatrixBlock> in ) {
-               return sumByKeyStable(in, in.getNumPartitions());
+       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
sumByKeyStable(JavaPairRDD<MatrixIndexes, MatrixBlock> in) {
+               return sumByKeyStable(in, in.getNumPartitions(), true);
+       }
+       
+       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
sumByKeyStable(JavaPairRDD<MatrixIndexes, MatrixBlock> in,
+                       boolean deepCopyCombiner) {
+               return sumByKeyStable(in, in.getNumPartitions(), 
deepCopyCombiner);
        }
        
-       public static JavaPairRDD<MatrixIndexes, MatrixBlock> sumByKeyStable( 
JavaPairRDD<MatrixIndexes, MatrixBlock> in, int numPartitions  )
+       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
sumByKeyStable(JavaPairRDD<MatrixIndexes, MatrixBlock> in, 
+                       int numPartitions, boolean deepCopyCombiner)
        {
                //stable sum of blocks per key, by passing correction blocks 
along with aggregates              
                JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp = 
-                               in.combineByKey( new 
CreateCorrBlockCombinerFunction(), 
+                               in.combineByKey( new 
CreateCorrBlockCombinerFunction(deepCopyCombiner), 
                                                             new 
MergeSumBlockValueFunction(), 
                                                             new 
MergeSumBlockCombinerFunction(), numPartitions );
                
@@ -134,13 +139,24 @@ public class RDDAggregateUtils
                                new AggregateSingleBlockFunction(aop) );
        }
 
-       public static JavaPairRDD<MatrixIndexes, MatrixBlock> aggByKeyStable( 
JavaPairRDD<MatrixIndexes, MatrixBlock> in, AggregateOperator aop )
+       public static JavaPairRDD<MatrixIndexes, MatrixBlock> aggByKeyStable( 
JavaPairRDD<MatrixIndexes, MatrixBlock> in, 
+                       AggregateOperator aop) {
+               return aggByKeyStable(in, aop, in.getNumPartitions(), true);
+       }
+       
+       public static JavaPairRDD<MatrixIndexes, MatrixBlock> aggByKeyStable( 
JavaPairRDD<MatrixIndexes, MatrixBlock> in, 
+                       AggregateOperator aop, boolean deepCopyCombiner ) {
+               return aggByKeyStable(in, aop, in.getNumPartitions(), 
deepCopyCombiner);
+       }
+       
+       public static JavaPairRDD<MatrixIndexes, MatrixBlock> aggByKeyStable( 
JavaPairRDD<MatrixIndexes, MatrixBlock> in, 
+                       AggregateOperator aop, int numPartitions, boolean 
deepCopyCombiner )
        {
                //stable sum of blocks per key, by passing correction blocks 
along with aggregates              
                JavaPairRDD<MatrixIndexes, CorrMatrixBlock> tmp = 
-                               in.combineByKey( new 
CreateCorrBlockCombinerFunction(), 
+                               in.combineByKey( new 
CreateCorrBlockCombinerFunction(deepCopyCombiner), 
                                                             new 
MergeAggBlockValueFunction(aop), 
-                                                            new 
MergeAggBlockCombinerFunction(aop) );
+                                                            new 
MergeAggBlockCombinerFunction(aop), numPartitions );
                
                //strip-off correction blocks from                              
             
                JavaPairRDD<MatrixIndexes, MatrixBlock> out =  
@@ -170,6 +186,21 @@ public class RDDAggregateUtils
         * assumption of disjoint data is violated.
         * 
         * @param in matrix as {@code JavaPairRDD<MatrixIndexes, MatrixBlock>}
+        * @param deepCopyCombiner indicator if the createCombiner functions 
needs to deep copy the input block
+        * @return matrix as {@code JavaPairRDD<MatrixIndexes, MatrixBlock>}
+        */
+       public static JavaPairRDD<MatrixIndexes, MatrixBlock> mergeByKey( 
JavaPairRDD<MatrixIndexes, MatrixBlock> in, 
+               boolean deepCopyCombiner ) {
+               return mergeByKey(in, in.getNumPartitions(), deepCopyCombiner); 
+       }
+       
+       /**
+        * Merges disjoint data of all blocks per key.
+        * 
+        * Note: The behavior of this method is undefined for both sparse and 
dense data if the 
+        * assumption of disjoint data is violated.
+        * 
+        * @param in matrix as {@code JavaPairRDD<MatrixIndexes, MatrixBlock>}
         * @param numPartitions number of output partitions
         * @param deepCopyCombiner indicator if the createCombiner functions 
needs to deep copy the input block
         * @return matrix as {@code JavaPairRDD<MatrixIndexes, MatrixBlock>}
@@ -205,13 +236,19 @@ public class RDDAggregateUtils
        {
                private static final long serialVersionUID = 
-3666451526776017343L;
 
+               private final boolean _deep;
+
+               public CreateCorrBlockCombinerFunction(boolean deep) {
+                       _deep = deep;
+               }
+               
                @Override
                public CorrMatrixBlock call(MatrixBlock arg0) 
                        throws Exception 
                {
                        //deep copy to allow update in-place
                        return new CorrMatrixBlock(
-                                       new MatrixBlock(arg0));
+                               _deep ? new MatrixBlock(arg0) : arg0);
                }       
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index 134a071..c5058a6 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -92,7 +92,7 @@ public class RDDConverterUtils
                }
                
                //aggregate partial matrix blocks
-               out = RDDAggregateUtils.mergeByKey( out ); 
+               out = RDDAggregateUtils.mergeByKey(out, false); 
                
                return out;
        }
@@ -112,7 +112,7 @@ public class RDDConverterUtils
                }
                
                //aggregate partial matrix blocks
-               out = RDDAggregateUtils.mergeByKey( out ); 
+               out = RDDAggregateUtils.mergeByKey(out, false); 
                
                return out;
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index cdf090d..6409735 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -110,7 +110,7 @@ public class RDDConverterUtilsExt
                }
                
                //aggregate partial matrix blocks
-               out = RDDAggregateUtils.mergeByKey( out ); 
+               out = RDDAggregateUtils.mergeByKey(out, false); 
                
                return out;
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java
index de1190a..046c015 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDSortUtils.java
@@ -66,7 +66,7 @@ public class RDDSortUtils
                JavaPairRDD<MatrixIndexes, MatrixBlock> ret = sdvals
                                .zipWithIndex()
                        .mapPartitionsToPair(new 
ConvertToBinaryBlockFunction(rlen, brlen));
-               ret = RDDAggregateUtils.mergeByKey(ret);        
+               ret = RDDAggregateUtils.mergeByKey(ret, false);
                
                return ret;
        }
@@ -88,7 +88,7 @@ public class RDDSortUtils
                JavaPairRDD<MatrixIndexes, MatrixBlock> ret = sdvals
                                .zipWithIndex()
                        .mapPartitionsToPair(new 
ConvertToBinaryBlockFunction2(rlen, brlen));
-               ret = RDDAggregateUtils.mergeByKey(ret);                
+               ret = RDDAggregateUtils.mergeByKey(ret, false);         
                
                return ret;
        }
@@ -111,7 +111,7 @@ public class RDDSortUtils
                JavaPairRDD<MatrixIndexes, MatrixBlock> ret = sdvals
                                .zipWithIndex()
                        .mapPartitionsToPair(new 
ConvertToBinaryBlockFunction3(rlen, brlen));
-               ret = RDDAggregateUtils.mergeByKey(ret);                
+               ret = RDDAggregateUtils.mergeByKey(ret, false);         
                
                return ret;     
        }
@@ -137,7 +137,7 @@ public class RDDSortUtils
                                .mapToPair(new ExtractIndexFunction())
                                .sortByKey()
                        .mapPartitionsToPair(new 
ConvertToBinaryBlockFunction4(rlen, brlen));
-               ixmap = RDDAggregateUtils.mergeByKey(ixmap);            
+               ixmap = RDDAggregateUtils.mergeByKey(ixmap, false);             
                
                //replicate indexes for all column blocks
                JavaPairRDD<MatrixIndexes, MatrixBlock> rixmap = ixmap

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9919a6c6/src/main/java/org/apache/sysml/utils/Statistics.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/utils/Statistics.java 
b/src/main/java/org/apache/sysml/utils/Statistics.java
index 801e402..097f58e 100644
--- a/src/main/java/org/apache/sysml/utils/Statistics.java
+++ b/src/main/java/org/apache/sysml/utils/Statistics.java
@@ -175,9 +175,11 @@ public class Statistics
        public static void resetNoOfCompiledJobs( int count ) {
                //reset both mr/sp for multiple tests within one jvm
                numCompiledSPInst.reset();
-               numCompiledSPInst.add(count);
                numCompiledMRJobs.reset();
-               numCompiledMRJobs.add(count);
+               if( OptimizerUtils.isSparkExecutionMode() )
+                       numCompiledSPInst.add(count);
+               else
+                       numCompiledMRJobs.add(count);
        }
 
        public static void resetNoOfExecutedJobs() {

Reply via email to