[SYSTEMML-382] MCSR-CSR sparse block conversion on rdd checkpoints Our default sparse block representation MCSR is - especially in blocked representation - memory-inefficient which causes unnecessary spilling, memory bandwidth requirements, and garbage collection on Spark. However, this format allows very efficient incremental construction. As an initial heuristic, we convert sparse blocks from MCSR to the compact CSR format on checkpoint instructions which reduces the size of cached RDDs and hence prevents unnecessary spilling. The additional block copy is likely amortized across repeated access. Also note that the bulk construction of CSR blocks is done in linear time and very efficient.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/e3f79680 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/e3f79680 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/e3f79680 Branch: refs/heads/master Commit: e3f79680b862306c5221535ebed306b99637e043 Parents: c359e6c Author: Matthias Boehm <[email protected]> Authored: Thu Jan 21 08:58:45 2016 -0800 Committer: Matthias Boehm <[email protected]> Committed: Thu Jan 21 08:59:22 2016 -0800 ---------------------------------------------------------------------- src/main/java/org/apache/sysml/hops/Hop.java | 6 +-- .../org/apache/sysml/hops/OptimizerUtils.java | 4 ++ .../java/org/apache/sysml/lops/Checkpoint.java | 3 +- .../spark/CheckpointSPInstruction.java | 13 ++++- .../functions/CreateSparseBlockFunction.java | 50 ++++++++++++++++++++ .../sysml/runtime/matrix/data/MatrixBlock.java | 16 +++++++ .../runtime/matrix/data/SparseBlockFactory.java | 38 +++++++++++++++ 7 files changed, 124 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e3f79680/src/main/java/org/apache/sysml/hops/Hop.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/Hop.java b/src/main/java/org/apache/sysml/hops/Hop.java index e031a87..7ddc995 100644 --- a/src/main/java/org/apache/sysml/hops/Hop.java +++ b/src/main/java/org/apache/sysml/hops/Hop.java @@ -24,7 +24,6 @@ import java.util.HashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.apache.sysml.api.DMLScript; import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; import org.apache.sysml.conf.ConfigurationManager; @@ -341,6 +340,7 @@ public abstract class Hop * * @throws HopsException */ + @SuppressWarnings("unused") //see CHECKPOINT_SPARSE_CSR private void constructAndSetCheckpointLopIfRequired() throws HopsException { @@ -373,13 +373,13 @@ public abstract class Hop //investigate need for serialized storage of large sparse matrices //(compile- instead of runtime-level for better debugging) boolean serializedStorage = false; - if( dimsKnown(true) ) { + if( dimsKnown(true) && !Checkpoint.CHECKPOINT_SPARSE_CSR ) { double matrixPSize = OptimizerUtils.estimatePartitionedSizeExactSparsity(_dim1, _dim2, _rows_in_block, _cols_in_block, _nnz); double dataCache = SparkExecutionContext.getConfiguredTotalDataMemory(true); serializedStorage = (MatrixBlock.evalSparseFormatInMemory(_dim1, _dim2, _nnz) && matrixPSize > dataCache ); //sparse in-memory does not fit in agg mem } - else { + else if( !dimsKnown(true) ) { setRequiresRecompile(); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e3f79680/src/main/java/org/apache/sysml/hops/OptimizerUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java index 64de05e..19acbd1 100644 --- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java +++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java @@ -1102,6 +1102,10 @@ public class OptimizerUtils return ret; } + public static double getSparsity( MatrixCharacteristics mc ) { + return getSparsity(mc.getRows(), mc.getCols(), mc.getNonZeros()); + } + public static double getSparsity( long dim1, long dim2, long nnz ) { if( dim1<=0 || dim2<=0 || nnz<0 ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e3f79680/src/main/java/org/apache/sysml/lops/Checkpoint.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/Checkpoint.java b/src/main/java/org/apache/sysml/lops/Checkpoint.java index 5fa8456..dd82b20 100644 --- a/src/main/java/org/apache/sysml/lops/Checkpoint.java +++ b/src/main/java/org/apache/sysml/lops/Checkpoint.java @@ -41,9 +41,10 @@ import org.apache.sysml.parser.Expression.ValueType; public class Checkpoint extends Lop { public static final String OPCODE = "chkpoint"; - + public static final StorageLevel DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK(); public static final StorageLevel SER_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK_SER(); + public static final boolean CHECKPOINT_SPARSE_CSR = true; public static final String STORAGE_LEVEL = "storage.level"; private StorageLevel _storageLevel; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e3f79680/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java index c5f69c1..6a0aa34 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java @@ -21,8 +21,8 @@ package org.apache.sysml.runtime.instructions.spark; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.storage.StorageLevel; - import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.lops.Checkpoint; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.DMLUnsupportedOperationException; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; @@ -34,9 +34,11 @@ import org.apache.sysml.runtime.instructions.cp.BooleanObject; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.spark.data.RDDObject; import org.apache.sysml.runtime.instructions.spark.functions.CopyBlockFunction; +import org.apache.sysml.runtime.instructions.spark.functions.CreateSparseBlockFunction; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; +import org.apache.sysml.runtime.matrix.data.SparseBlock; import org.apache.sysml.runtime.matrix.operators.Operator; @@ -110,7 +112,14 @@ public class CheckpointSPInstruction extends UnarySPInstruction //apply a narrow shallow copy to allow for short-circuit collects out = in.mapValues(new CopyBlockFunction(false)); } - + + //convert mcsr into memory-efficient csr if potentially sparse + if( OptimizerUtils.getSparsity(mcIn) < MatrixBlock.SPARSITY_TURN_POINT + && Checkpoint.CHECKPOINT_SPARSE_CSR ) + { + out = out.mapValues(new CreateSparseBlockFunction(SparseBlock.Type.CSR)); + } + //actual checkpoint into given storage level out = out.persist( _level ); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e3f79680/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CreateSparseBlockFunction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CreateSparseBlockFunction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CreateSparseBlockFunction.java new file mode 100644 index 0000000..51f3217 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CreateSparseBlockFunction.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysml.runtime.instructions.spark.functions; + +import org.apache.spark.api.java.function.Function; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.SparseBlock; + +/** + * General purpose copy function for binary block values. This function can be used in + * mapValues (copy matrix blocks) to change the internal sparse block representation. + * See CopyBlockFunction if no change of SparseBlock.Type required. + * + */ +public class CreateSparseBlockFunction implements Function<MatrixBlock,MatrixBlock> +{ + private static final long serialVersionUID = -4503367283351708178L; + + private SparseBlock.Type _stype = null; + + public CreateSparseBlockFunction( SparseBlock.Type stype ) { + _stype = stype; + } + + @Override + public MatrixBlock call(MatrixBlock arg0) + throws Exception + { + if( arg0.isInSparseFormat() ) + return new MatrixBlock(arg0, _stype); + else //pass through dense + return arg0; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e3f79680/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index 4cd00ca..ee1d3a5 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -175,6 +175,22 @@ public class MatrixBlock extends MatrixValue implements Externalizable this.copy(that); } + public MatrixBlock(MatrixBlock that, SparseBlock.Type stype) + { + //sanity check sparse matrix block + if( !that.isInSparseFormat() ) + throw new RuntimeException("Sparse matrix block expected."); + + //deep copy and change sparse block type + rlen = that.rlen; + clen = that.clen; + sparse = that.sparse; + nonZeros = that.nonZeros; + estimatedNNzsPerRow = that.estimatedNNzsPerRow; + sparseBlock = SparseBlockFactory + .copySparseBlock(stype, that.sparseBlock, true); + } + //////// // Initialization methods // (reset, init, allocate, etc) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e3f79680/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockFactory.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockFactory.java index 7d5a5b1..2de508e 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockFactory.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockFactory.java @@ -45,4 +45,42 @@ public abstract class SparseBlockFactory throw new RuntimeException("Unexpected sparse block type: "+type.toString()); } } + + /** + * + * @param type + * @param sblock + * @return + */ + public static SparseBlock copySparseBlock( SparseBlock.Type type, SparseBlock sblock ) { + return copySparseBlock(type, sblock, false); + } + + /** + * + * @param type + * @param sblock + * @param forceCopy + * @return + */ + public static SparseBlock copySparseBlock( SparseBlock.Type type, SparseBlock sblock, boolean forceCopy ) + { + //check for existing target type + if( !forceCopy && + ( (sblock instanceof SparseBlockMCSR && type == SparseBlock.Type.MCSR) + ||(sblock instanceof SparseBlockCSR && type == SparseBlock.Type.CSR) + ||(sblock instanceof SparseBlockCOO && type == SparseBlock.Type.COO)) ) + { + return sblock; + } + + //create target sparse block + switch( type ) { + case MCSR: return new SparseBlockMCSR(sblock); + case CSR: return new SparseBlockCSR(sblock); + case COO: return new SparseBlockCOO(sblock); + default: + throw new RuntimeException("Unexpected sparse block type: "+type.toString()); + } + } }
