[SYSTEMML-382] MCSR-CSR sparse block conversion on rdd checkpoints

Our default sparse block representation MCSR is - especially in blocked
representation - memory-inefficient which causes unnecessary spilling,
memory bandwidth requirements, and garbage collection on Spark. However,
this format allows very efficient incremental construction. As an
initial heuristic, we convert sparse blocks from MCSR to the compact CSR
format on checkpoint instructions which reduces the size of cached RDDs
and hence prevents unnecessary spilling. The additional block copy is
likely amortized across repeated access. Also note that the bulk
construction of CSR blocks is done in linear time and very efficient.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/e3f79680
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/e3f79680
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/e3f79680

Branch: refs/heads/master
Commit: e3f79680b862306c5221535ebed306b99637e043
Parents: c359e6c
Author: Matthias Boehm <[email protected]>
Authored: Thu Jan 21 08:58:45 2016 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Thu Jan 21 08:59:22 2016 -0800

----------------------------------------------------------------------
 src/main/java/org/apache/sysml/hops/Hop.java    |  6 +--
 .../org/apache/sysml/hops/OptimizerUtils.java   |  4 ++
 .../java/org/apache/sysml/lops/Checkpoint.java  |  3 +-
 .../spark/CheckpointSPInstruction.java          | 13 ++++-
 .../functions/CreateSparseBlockFunction.java    | 50 ++++++++++++++++++++
 .../sysml/runtime/matrix/data/MatrixBlock.java  | 16 +++++++
 .../runtime/matrix/data/SparseBlockFactory.java | 38 +++++++++++++++
 7 files changed, 124 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e3f79680/src/main/java/org/apache/sysml/hops/Hop.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/Hop.java 
b/src/main/java/org/apache/sysml/hops/Hop.java
index e031a87..7ddc995 100644
--- a/src/main/java/org/apache/sysml/hops/Hop.java
+++ b/src/main/java/org/apache/sysml/hops/Hop.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.conf.ConfigurationManager;
@@ -341,6 +340,7 @@ public abstract class Hop
         * 
         * @throws HopsException
         */
+       @SuppressWarnings("unused") //see CHECKPOINT_SPARSE_CSR
        private void constructAndSetCheckpointLopIfRequired() 
                throws HopsException
        {
@@ -373,13 +373,13 @@ public abstract class Hop
                                //investigate need for serialized storage of 
large sparse matrices
                                //(compile- instead of runtime-level for better 
debugging)
                                boolean serializedStorage = false;
-                               if( dimsKnown(true) ) {
+                               if( dimsKnown(true) && 
!Checkpoint.CHECKPOINT_SPARSE_CSR ) {
                                        double matrixPSize = 
OptimizerUtils.estimatePartitionedSizeExactSparsity(_dim1, _dim2, 
_rows_in_block, _cols_in_block, _nnz);
                                        double dataCache = 
SparkExecutionContext.getConfiguredTotalDataMemory(true);
                                        serializedStorage = 
(MatrixBlock.evalSparseFormatInMemory(_dim1, _dim2, _nnz)
                                                                     && 
matrixPSize > dataCache ); //sparse in-memory does not fit in agg mem 
                                }
-                               else {
+                               else if( !dimsKnown(true) ) {
                                        setRequiresRecompile();
                                }
                        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e3f79680/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index 64de05e..19acbd1 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -1102,6 +1102,10 @@ public class OptimizerUtils
                return ret; 
        }
        
+       public static double getSparsity( MatrixCharacteristics mc ) {
+               return getSparsity(mc.getRows(), mc.getCols(), 
mc.getNonZeros());
+       }
+       
        public static double getSparsity( long dim1, long dim2, long nnz )
        {
                if( dim1<=0 || dim2<=0 || nnz<0 )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e3f79680/src/main/java/org/apache/sysml/lops/Checkpoint.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/Checkpoint.java 
b/src/main/java/org/apache/sysml/lops/Checkpoint.java
index 5fa8456..dd82b20 100644
--- a/src/main/java/org/apache/sysml/lops/Checkpoint.java
+++ b/src/main/java/org/apache/sysml/lops/Checkpoint.java
@@ -41,9 +41,10 @@ import org.apache.sysml.parser.Expression.ValueType;
 public class Checkpoint extends Lop 
 {
        public static final String OPCODE = "chkpoint"; 
-       
+        
        public static final StorageLevel DEFAULT_STORAGE_LEVEL = 
StorageLevel.MEMORY_AND_DISK();
        public static final StorageLevel SER_STORAGE_LEVEL = 
StorageLevel.MEMORY_AND_DISK_SER();
+       public static final boolean CHECKPOINT_SPARSE_CSR = true; 
        public static final String STORAGE_LEVEL = "storage.level"; 
 
        private StorageLevel _storageLevel;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e3f79680/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
index c5f69c1..6a0aa34 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
@@ -21,8 +21,8 @@ package org.apache.sysml.runtime.instructions.spark;
 
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.storage.StorageLevel;
-
 import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.lops.Checkpoint;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.DMLUnsupportedOperationException;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
@@ -34,9 +34,11 @@ import 
org.apache.sysml.runtime.instructions.cp.BooleanObject;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.data.RDDObject;
 import org.apache.sysml.runtime.instructions.spark.functions.CopyBlockFunction;
+import 
org.apache.sysml.runtime.instructions.spark.functions.CreateSparseBlockFunction;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
 import org.apache.sysml.runtime.matrix.operators.Operator;
 
 
@@ -110,7 +112,14 @@ public class CheckpointSPInstruction extends 
UnarySPInstruction
                                //apply a narrow shallow copy to allow for 
short-circuit collects 
                                out = in.mapValues(new 
CopyBlockFunction(false));       
                        }
-                               
+               
+                       //convert mcsr into memory-efficient csr if potentially 
sparse
+                       if( OptimizerUtils.getSparsity(mcIn) < 
MatrixBlock.SPARSITY_TURN_POINT
+                               && Checkpoint.CHECKPOINT_SPARSE_CSR )
+                       {                               
+                               out = out.mapValues(new 
CreateSparseBlockFunction(SparseBlock.Type.CSR));
+                       }
+                       
                        //actual checkpoint into given storage level
                        out = out.persist( _level );
                }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e3f79680/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CreateSparseBlockFunction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CreateSparseBlockFunction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CreateSparseBlockFunction.java
new file mode 100644
index 0000000..51f3217
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/CreateSparseBlockFunction.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysml.runtime.instructions.spark.functions;
+
+import org.apache.spark.api.java.function.Function;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
+
+/**
+ * General purpose copy function for binary block values. This function can be 
used in
+ * mapValues (copy matrix blocks) to change the internal sparse block 
representation. 
+ * See CopyBlockFunction if no change of SparseBlock.Type required.
+ * 
+ */
+public class CreateSparseBlockFunction implements 
Function<MatrixBlock,MatrixBlock> 
+{
+       private static final long serialVersionUID = -4503367283351708178L;
+       
+       private SparseBlock.Type _stype = null;
+       
+       public CreateSparseBlockFunction( SparseBlock.Type stype ) {
+               _stype = stype;
+       }
+
+       @Override
+       public MatrixBlock call(MatrixBlock arg0)
+               throws Exception 
+       {
+               if( arg0.isInSparseFormat() )
+                       return new MatrixBlock(arg0, _stype);
+               else //pass through dense
+                       return arg0;    
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e3f79680/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 4cd00ca..ee1d3a5 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -175,6 +175,22 @@ public class MatrixBlock extends MatrixValue implements 
Externalizable
                this.copy(that);
        }
        
+       public MatrixBlock(MatrixBlock that, SparseBlock.Type stype)
+       {
+               //sanity check sparse matrix block
+               if( !that.isInSparseFormat() )
+                       throw new RuntimeException("Sparse matrix block 
expected.");
+               
+               //deep copy and change sparse block type
+               rlen = that.rlen;
+               clen = that.clen;
+               sparse = that.sparse;
+               nonZeros = that.nonZeros;
+               estimatedNNzsPerRow = that.estimatedNNzsPerRow;
+               sparseBlock = SparseBlockFactory
+                               .copySparseBlock(stype, that.sparseBlock, true);
+       }
+       
        ////////
        // Initialization methods
        // (reset, init, allocate, etc)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e3f79680/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockFactory.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockFactory.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockFactory.java
index 7d5a5b1..2de508e 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockFactory.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockFactory.java
@@ -45,4 +45,42 @@ public abstract class SparseBlockFactory
                                throw new RuntimeException("Unexpected sparse 
block type: "+type.toString());
                }
        }
+       
+       /**
+        * 
+        * @param type
+        * @param sblock
+        * @return
+        */
+       public static SparseBlock copySparseBlock( SparseBlock.Type type, 
SparseBlock sblock ) {
+               return copySparseBlock(type, sblock, false);
+       }
+       
+       /**
+        * 
+        * @param type
+        * @param sblock
+        * @param forceCopy
+        * @return
+        */
+       public static SparseBlock copySparseBlock( SparseBlock.Type type, 
SparseBlock sblock, boolean forceCopy )
+       {
+               //check for existing target type
+               if( !forceCopy && 
+                       ( (sblock instanceof SparseBlockMCSR && type == 
SparseBlock.Type.MCSR)
+                       ||(sblock instanceof SparseBlockCSR && type == 
SparseBlock.Type.CSR)
+                       ||(sblock instanceof SparseBlockCOO && type == 
SparseBlock.Type.COO))  )
+               {
+                       return sblock;
+               }
+               
+               //create target sparse block
+               switch( type ) {
+                       case MCSR: return new SparseBlockMCSR(sblock);
+                       case CSR: return new SparseBlockCSR(sblock);
+                       case COO: return new SparseBlockCOO(sblock);
+                       default:
+                               throw new RuntimeException("Unexpected sparse 
block type: "+type.toString());
+               }
+       }
 }

Reply via email to