Repository: systemml Updated Branches: refs/heads/master 696e79218 -> addd6e121
[SYSTEMML-2223] Repartition ultra-sparse matrices to preferred #parts This patch improves the spark checkpointing (i.e., distributed caching) logic by repartitioning ultra-sparse matrices to the preferred number of partitions as multiple of the default parallelism. Furthermore, this also makes a minor improvement for empty block handling on aggregating ultra-sparse matrices to avoid unnecessary GC overhead. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/015b2731 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/015b2731 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/015b2731 Branch: refs/heads/master Commit: 015b2731893b5f630a0bdfb8cb0efdf86e84fd05 Parents: 696e792 Author: Matthias Boehm <[email protected]> Authored: Sat Mar 31 14:54:28 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Sat Mar 31 14:54:28 2018 -0700 ---------------------------------------------------------------------- .../spark/CheckpointSPInstruction.java | 25 ++++++++++++++------ .../spark/utils/RDDAggregateUtils.java | 10 ++++---- .../runtime/matrix/MatrixCharacteristics.java | 7 ++++++ 3 files changed, 31 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/015b2731/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java index ccd7319..33dd494 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java @@ -41,6 +41,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.SparseBlock; import org.apache.sysml.runtime.matrix.operators.Operator; +import org.apache.sysml.runtime.util.UtilFunctions; public class CheckpointSPInstruction extends UnarySPInstruction { // default storage level @@ -100,19 +101,28 @@ public class CheckpointSPInstruction extends UnarySPInstruction { JavaPairRDD<?,?> out = null; if( !in.getStorageLevel().equals( _level ) ) { - //(trigger coalesce if intended number of partitions exceeded by 20% - //and not hash partitioned to avoid losing the existing partitioner) + //determine need for coalesce or repartition, and csr conversion int numPartitions = SparkUtils.getNumPreferredPartitions(mcIn, in); boolean coalesce = ( 1.2*numPartitions < in.getNumPartitions() && !SparkUtils.isHashPartitioned(in) && in.getNumPartitions() > SparkExecutionContext.getDefaultParallelism(true)); + boolean repartition = mcIn.dimsKnown(true) && mcIn.isUltraSparse() + && numPartitions > in.getNumPartitions(); + boolean mcsr2csr = input1.getDataType()==DataType.MATRIX + && OptimizerUtils.checkSparseBlockCSRConversion(mcIn) + && !_level.equals(Checkpoint.SER_STORAGE_LEVEL); //checkpoint pre-processing rdd operations if( coalesce ) { //merge partitions without shuffle if too many partitions out = in.coalesce( numPartitions ); } - else { + else if( repartition ) { + //repartition to preferred size as multiple of default parallelism + out = in.repartition(UtilFunctions.roundToNext(numPartitions, + SparkExecutionContext.getDefaultParallelism(true))); + } + else if( !mcsr2csr ) { //since persist is an in-place marker for a storage level, we //apply a narrow shallow copy to allow for short-circuit collects if( input1.getDataType() == DataType.MATRIX ) @@ -120,13 +130,14 @@ public class CheckpointSPInstruction extends UnarySPInstruction { (JavaPairRDD<MatrixIndexes,MatrixBlock>)in, false); else if( input1.getDataType() == DataType.FRAME) out = ((JavaPairRDD<Long,FrameBlock>)in) - .mapValues(new CopyFrameBlockFunction(false)); + .mapValues(new CopyFrameBlockFunction(false)); + } + else { + out = in; } //convert mcsr into memory-efficient csr if potentially sparse - if( input1.getDataType()==DataType.MATRIX - && OptimizerUtils.checkSparseBlockCSRConversion(mcIn) - && !_level.equals(Checkpoint.SER_STORAGE_LEVEL) ) { + if( mcsr2csr ) { out = ((JavaPairRDD<MatrixIndexes,MatrixBlock>)out) .mapValues(new CreateSparseBlockFunction(SparseBlock.Type.CSR)); } http://git-wip-us.apache.org/repos/asf/systemml/blob/015b2731/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java index 476e93f..0101e26 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java @@ -256,26 +256,28 @@ public class RDDAggregateUtils { private static final long serialVersionUID = 3703543699467085539L; - private AggregateOperator _op = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject(), true, CorrectionLocationType.NONE); + private AggregateOperator _op = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject(), true, CorrectionLocationType.NONE); @Override public CorrMatrixBlock call(CorrMatrixBlock arg0, MatrixBlock arg1) throws Exception { + if( arg1.isEmptyBlock(false) ) + return arg0; + //get current block and correction MatrixBlock value = arg0.getValue(); MatrixBlock corr = arg0.getCorrection(); //correction block allocation on demand - if( corr == null ){ + if( corr == null ) corr = new MatrixBlock(value.getNumRows(), value.getNumColumns(), false); - } //aggregate other input and maintain corrections //(existing value and corr are used in place) OperationsOnMatrixValues.incrementalAggregation(value, corr, arg1, _op, false); return arg0.set(value, corr); - } + } } private static class MergeSumBlockCombinerFunction implements Function2<CorrMatrixBlock, CorrMatrixBlock, CorrMatrixBlock> http://git-wip-us.apache.org/repos/asf/systemml/blob/015b2731/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java index 1443a8c..91d70f8 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java @@ -24,6 +24,7 @@ import java.io.Serializable; import java.util.Arrays; import java.util.HashMap; +import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.lops.MMTSJ.MMTSJType; import org.apache.sysml.runtime.instructions.mr.AggregateBinaryInstruction; import org.apache.sysml.runtime.instructions.mr.AggregateInstruction; @@ -61,6 +62,7 @@ import org.apache.sysml.runtime.instructions.mr.UaggOuterChainInstruction; import org.apache.sysml.runtime.instructions.mr.UnaryInstruction; import org.apache.sysml.runtime.instructions.mr.UnaryMRInstructionBase; import org.apache.sysml.runtime.instructions.mr.ZeroOutInstruction; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator; import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator; import org.apache.sysml.runtime.matrix.operators.ReorgOperator; @@ -225,6 +227,11 @@ public class MatrixCharacteristics implements Serializable return ( !ubNnz && nonZero >= 0 ); } + public boolean isUltraSparse() { + return dimsKnown(true) && OptimizerUtils.getSparsity(this) + < MatrixBlock.ULTRA_SPARSITY_TURN_POINT; + } + public boolean mightHaveEmptyBlocks() { long singleBlk = Math.max(Math.min(numRows, numRowsPerBlock),1) * Math.max(Math.min(numColumns, numColumnsPerBlock),1);
