Repository: systemml
Updated Branches:
  refs/heads/master 696e79218 -> addd6e121


[SYSTEMML-2223] Repartition ultra-sparse matrices to preferred #parts

This patch improves the spark checkpointing (i.e., distributed caching)
logic by repartitioning ultra-sparse matrices to the preferred number of
partitions as multiple of the default parallelism.

Furthermore, this also makes a minor improvement for empty block
handling on aggregating ultra-sparse matrices to avoid unnecessary GC
overhead.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/015b2731
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/015b2731
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/015b2731

Branch: refs/heads/master
Commit: 015b2731893b5f630a0bdfb8cb0efdf86e84fd05
Parents: 696e792
Author: Matthias Boehm <[email protected]>
Authored: Sat Mar 31 14:54:28 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Sat Mar 31 14:54:28 2018 -0700

----------------------------------------------------------------------
 .../spark/CheckpointSPInstruction.java          | 25 ++++++++++++++------
 .../spark/utils/RDDAggregateUtils.java          | 10 ++++----
 .../runtime/matrix/MatrixCharacteristics.java   |  7 ++++++
 3 files changed, 31 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/015b2731/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
index ccd7319..33dd494 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
@@ -41,6 +41,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.SparseBlock;
 import org.apache.sysml.runtime.matrix.operators.Operator;
+import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class CheckpointSPInstruction extends UnarySPInstruction {
        // default storage level
@@ -100,19 +101,28 @@ public class CheckpointSPInstruction extends 
UnarySPInstruction {
                JavaPairRDD<?,?> out = null;
                if( !in.getStorageLevel().equals( _level ) ) 
                {
-                       //(trigger coalesce if intended number of partitions 
exceeded by 20%
-                       //and not hash partitioned to avoid losing the existing 
partitioner)
+                       //determine need for coalesce or repartition, and csr 
conversion
                        int numPartitions = 
SparkUtils.getNumPreferredPartitions(mcIn, in);
                        boolean coalesce = ( 1.2*numPartitions < 
in.getNumPartitions()
                                && !SparkUtils.isHashPartitioned(in) && 
in.getNumPartitions()
                                > 
SparkExecutionContext.getDefaultParallelism(true));
+                       boolean repartition = mcIn.dimsKnown(true) && 
mcIn.isUltraSparse()
+                               && numPartitions > in.getNumPartitions();
+                       boolean mcsr2csr = 
input1.getDataType()==DataType.MATRIX 
+                               && 
OptimizerUtils.checkSparseBlockCSRConversion(mcIn)
+                               && !_level.equals(Checkpoint.SER_STORAGE_LEVEL);
                        
                        //checkpoint pre-processing rdd operations
                        if( coalesce ) {
                                //merge partitions without shuffle if too many 
partitions
                                out = in.coalesce( numPartitions );
                        }
-                       else {
+                       else if( repartition ) {
+                               //repartition to preferred size as multiple of 
default parallelism
+                               out = 
in.repartition(UtilFunctions.roundToNext(numPartitions,
+                                       
SparkExecutionContext.getDefaultParallelism(true)));
+                       }
+                       else if( !mcsr2csr ) {
                                //since persist is an in-place marker for a 
storage level, we 
                                //apply a narrow shallow copy to allow for 
short-circuit collects 
                                if( input1.getDataType() == DataType.MATRIX )
@@ -120,13 +130,14 @@ public class CheckpointSPInstruction extends 
UnarySPInstruction {
                                                
(JavaPairRDD<MatrixIndexes,MatrixBlock>)in, false);
                                else if( input1.getDataType() == DataType.FRAME)
                                        out = ((JavaPairRDD<Long,FrameBlock>)in)
-                                               .mapValues(new 
CopyFrameBlockFunction(false));  
+                                               .mapValues(new 
CopyFrameBlockFunction(false));
+                       }
+                       else {
+                               out = in;
                        }
                        
                        //convert mcsr into memory-efficient csr if potentially 
sparse
-                       if( input1.getDataType()==DataType.MATRIX 
-                               && 
OptimizerUtils.checkSparseBlockCSRConversion(mcIn)
-                               && !_level.equals(Checkpoint.SER_STORAGE_LEVEL) 
) {
+                       if( mcsr2csr ) {
                                out = 
((JavaPairRDD<MatrixIndexes,MatrixBlock>)out)
                                        .mapValues(new 
CreateSparseBlockFunction(SparseBlock.Type.CSR));
                        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/015b2731/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
index 476e93f..0101e26 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -256,26 +256,28 @@ public class RDDAggregateUtils
        {
                private static final long serialVersionUID = 
3703543699467085539L;
                
-               private AggregateOperator _op = new AggregateOperator(0, 
KahanPlus.getKahanPlusFnObject(), true, CorrectionLocationType.NONE);  
+               private AggregateOperator _op = new AggregateOperator(0, 
KahanPlus.getKahanPlusFnObject(), true, CorrectionLocationType.NONE);
                
                @Override
                public CorrMatrixBlock call(CorrMatrixBlock arg0, MatrixBlock 
arg1) 
                        throws Exception 
                {
+                       if( arg1.isEmptyBlock(false) )
+                               return arg0;
+                       
                        //get current block and correction
                        MatrixBlock value = arg0.getValue();
                        MatrixBlock corr = arg0.getCorrection();
                        
                        //correction block allocation on demand
-                       if( corr == null ){
+                       if( corr == null )
                                corr = new MatrixBlock(value.getNumRows(), 
value.getNumColumns(), false);
-                       }
                        
                        //aggregate other input and maintain corrections 
                        //(existing value and corr are used in place)
                        OperationsOnMatrixValues.incrementalAggregation(value, 
corr, arg1, _op, false);
                        return arg0.set(value, corr);
-               }       
+               }
        }
 
        private static class MergeSumBlockCombinerFunction implements 
Function2<CorrMatrixBlock, CorrMatrixBlock, CorrMatrixBlock> 

http://git-wip-us.apache.org/repos/asf/systemml/blob/015b2731/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java 
b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
index 1443a8c..91d70f8 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.HashMap;
 
+import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.lops.MMTSJ.MMTSJType;
 import org.apache.sysml.runtime.instructions.mr.AggregateBinaryInstruction;
 import org.apache.sysml.runtime.instructions.mr.AggregateInstruction;
@@ -61,6 +62,7 @@ import 
org.apache.sysml.runtime.instructions.mr.UaggOuterChainInstruction;
 import org.apache.sysml.runtime.instructions.mr.UnaryInstruction;
 import org.apache.sysml.runtime.instructions.mr.UnaryMRInstructionBase;
 import org.apache.sysml.runtime.instructions.mr.ZeroOutInstruction;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
 import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.ReorgOperator;
@@ -225,6 +227,11 @@ public class MatrixCharacteristics implements Serializable
                return ( !ubNnz && nonZero >= 0 );
        }
        
+       public boolean isUltraSparse() {
+               return dimsKnown(true) && OptimizerUtils.getSparsity(this)
+                       < MatrixBlock.ULTRA_SPARSITY_TURN_POINT;
+       }
+       
        public boolean mightHaveEmptyBlocks() {
                long singleBlk = Math.max(Math.min(numRows, numRowsPerBlock),1) 
                                * Math.max(Math.min(numColumns, 
numColumnsPerBlock),1);

Reply via email to