[SYSTEMML-2217] Fix OOM read sparse textcell matrix w/ unknown nnz

In case of forced single-node execution, the read of ultra-sparse
textcell matrices with unknown nnzs can cause unnecessary out-of-memory
errors because we always allocate the output in dense format for
unknowns. This patch fixes this robustness issue by obtaining a rough
estimate of the number of non-zeros based on the ratio of filesize on
hdfs to expected size in dense format.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/094f5551
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/094f5551
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/094f5551

Branch: refs/heads/master
Commit: 094f5551dd08f1d1747d70bbf23cc246d503d95e
Parents: 665d037
Author: Matthias Boehm <[email protected]>
Authored: Thu Mar 29 21:34:46 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Thu Mar 29 21:34:46 2018 -0700

----------------------------------------------------------------------
 .../runtime/controlprogram/caching/MatrixObject.java     |  3 ++-
 .../java/org/apache/sysml/runtime/io/ReaderTextCell.java | 11 +++++++----
 .../apache/sysml/runtime/io/ReaderTextCellParallel.java  |  2 ++
 .../org/apache/sysml/runtime/matrix/data/InputInfo.java  |  9 +++++++--
 .../org/apache/sysml/runtime/util/MapReduceTool.java     |  8 ++++++++
 5 files changed, 26 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/094f5551/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
index f258ea8..de49222 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
@@ -421,7 +421,8 @@ public class MatrixObject extends CacheableData<MatrixBlock>
                }
                
                //read matrix and maintain meta data
-               double sparsity = (mc.getNonZeros() >= 0 ? 
((double)mc.getNonZeros())/(mc.getRows()*mc.getCols()) : 1.0d); 
+               double sparsity = (mc.getNonZeros() < 0) ? 
(iimd.getInputInfo().isTextIJV()?-1:1) :
+                       
OptimizerUtils.getSparsity(mc.getNonZeros(),mc.getRows(),mc.getCols());
                MatrixBlock newData = DataConverter.readMatrixFromHDFS(fname, 
iimd.getInputInfo(), rlen, clen,
                                mc.getRowsPerBlock(), mc.getColsPerBlock(), 
sparsity, getFileFormatProperties());
                setHDFSFileExists(true);

http://git-wip-us.apache.org/repos/asf/systemml/blob/094f5551/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
index 918ab6b..1fa8940 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
@@ -41,6 +41,7 @@ import org.apache.sysml.runtime.matrix.data.DenseBlock;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.util.FastStringTokenizer;
+import org.apache.sysml.runtime.util.MapReduceTool;
 
 public class ReaderTextCell extends MatrixReader
 {
@@ -55,14 +56,16 @@ public class ReaderTextCell extends MatrixReader
        public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long 
clen, int brlen, int bclen, long estnnz) 
                throws IOException, DMLRuntimeException 
        {
-               //allocate output matrix block
-               MatrixBlock ret = createOutputMatrixBlock(rlen, clen, 
(int)rlen, (int)clen, estnnz, true, false);
-               
                //prepare file access
-               JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
+               JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
                Path path = new Path( fname );
                FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                
+               //allocate output matrix block
+               if( estnnz < 0 )
+                       estnnz = MapReduceTool.estimateNnzBasedOnFileSize(path, 
rlen, clen, brlen, bclen, 3);
+               MatrixBlock ret = createOutputMatrixBlock(rlen, clen, 
(int)rlen, (int)clen, estnnz, true, false);
+               
                //check existence and non-empty file
                checkValidInputFile(fs, path); 
        

http://git-wip-us.apache.org/repos/asf/systemml/blob/094f5551/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
index 040b43c..72e0cde 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
@@ -91,6 +91,8 @@ public class ReaderTextCellParallel extends MatrixReader
                checkValidInputFile(fs, path);
                
                //allocate output matrix block
+               if( estnnz < 0 )
+                       estnnz = MapReduceTool.estimateNnzBasedOnFileSize(path, 
rlen, clen, brlen, bclen, 3);
                MatrixBlock ret = createOutputMatrixBlock(rlen, clen, 
(int)rlen, (int)clen, estnnz, true, false);
        
                //core read 

http://git-wip-us.apache.org/repos/asf/systemml/blob/094f5551/src/main/java/org/apache/sysml/runtime/matrix/data/InputInfo.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/InputInfo.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/InputInfo.java
index 2bbad62..02df7d1 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/InputInfo.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/InputInfo.java
@@ -54,10 +54,15 @@ public class InputInfo implements Serializable
                inputValueClass=valueCls;
        }
 
-       public void setMetaData(MetaData md)
-       {
+       public void setMetaData(MetaData md) {
                metadata=md;
        }
+       
+       public boolean isTextIJV() {
+               return this == InputInfo.TextCellInputInfo
+                       || this == InputInfo.MatrixMarketInputInfo;
+       }
+       
        public static final InputInfo TextCellInputInfo=new 
InputInfo(TextInputFormat.class, 
                         LongWritable.class, Text.class);
        public static final InputInfo MatrixMarketInputInfo = new InputInfo 
(TextInputFormat.class, 

http://git-wip-us.apache.org/repos/asf/systemml/blob/094f5551/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java 
b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
index 52ad388..c6d9604 100644
--- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
+++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.parser.DataExpression;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
@@ -238,6 +239,13 @@ public class MapReduceTool
                        FileUtil.copy(fs, originalPath, fs, newPath, 
deleteSource, overwrite, job);
                }
        }
+       
+       public static long estimateNnzBasedOnFileSize(Path path,
+               long rlen, long clen, int brlen, int bclen, double factor) 
throws IOException
+       {
+               return (long) Math.min(rlen*clen, 
rlen*clen*(getFilesizeOnHDFS(path)/factor/
+                       
OptimizerUtils.estimatePartitionedSizeExactSparsity(rlen, clen, brlen, bclen, 
1.0)));
+       }
 
        /**
         * Returns the size of a file or directory on hdfs in bytes.

Reply via email to