Repository: incubator-systemml
Updated Branches:
  refs/heads/master 9a77b7bcc -> 0f4571810


[SYSTEMML-1442] Reduce the JVM memory required for transfering Numpy/Scipy array

Initial experiments show that it takes 17.52155303955078 seconds to
transfer a NumPy array (shape=10000 X 5000) to SystemML as a Matrixblock
as well as output sum (using MLContext) using existing approach, whereas
it takes 458.05856013298035 seconds to do the same via DataFrame _py2java.

```python
def executeNumpy():
        script = sml.dml('print("X:" +
        sum(X))').input(X=np.random.rand(num_rows,num_cols))
        ml.execute(script)

def executeDF():
        script = sml.dml('print("X:" +
        
sum(X))').input(X=sqlCtx.createDataFrame(pd.DataFrame(np.random.rand(num_rows,num_cols))))
        ml.execute(script)

```

Closes #443.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/0f457181
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/0f457181
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/0f457181

Branch: refs/heads/master
Commit: 0f45718106b8864ef5aee56a742644d11eabf3e8
Parents: 9a77b7b
Author: Niketan Pansare <[email protected]>
Authored: Thu Mar 30 13:43:53 2017 -0700
Committer: Niketan Pansare <[email protected]>
Committed: Thu Mar 30 13:49:49 2017 -0700

----------------------------------------------------------------------
 .../sysml/api/mlcontext/MLContextUtil.java      | 16 ++++--
 .../spark/utils/RDDConverterUtilsExt.java       | 52 ++++++++++---------
 src/main/python/systemml/converters.py          | 53 +++++++++-----------
 3 files changed, 62 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0f457181/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java 
b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
index 17aa4a7..4cd95d4 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
@@ -889,11 +889,17 @@ public final class MLContextUtil {
                                }
                                sb.append(") ");
 
-                               sb.append(key);
-                               sb.append(": ");
-                               String str = object.toString();
-                               str = StringUtils.abbreviate(str, 100);
-                               sb.append(str);
+                                sb.append(key);
+                                sb.append(": ");
+                                String str = null;
+                                if(object instanceof MatrixBlock) {
+                                        MatrixBlock mb = (MatrixBlock) object;
+                                        str = "MatrixBlock [sparse? = " + 
mb.isInSparseFormat() + ", nonzeros = " + mb.getNonZeros() + ", size: " + 
mb.getNumRows() + " X " + mb.getNumColumns() + "]";  
+                                }
+                                else 
+                                        str = object.toString(); // TODO: Deal 
with OOM for other objects such as Frame, etc
+                                str = StringUtils.abbreviate(str, 100);
+                                sb.append(str);
                                sb.append("\n");
                        }
                }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0f457181/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index d764109..f206fbd 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -165,35 +165,37 @@ public class RDDConverterUtilsExt
                return convertPy4JArrayToMB(data, (int) rlen, (int) clen, 
isSparse);
        }
        
-       public static MatrixBlock mergeRowBlocks(ArrayList<MatrixBlock> mb, int 
numRowsPerBlock, int rlen, int clen, boolean isSparse) throws 
DMLRuntimeException {
-               return mergeRowBlocks(mb, (long)numRowsPerBlock, (long)rlen, 
(long)clen, isSparse);
-       }
-       
-       /**
-        * This creates a MatrixBlock from list of row blocks
-        * 
-        * @param mb list of row blocks
-        * @param numRowsPerBlock number of rows per block
-        * @param rlen number of rows
-        * @param clen number of columns
-        * @param isSparse is the output matrix in sparse format
-        * @return a matrix block of shape (rlen, clen)
-        * @throws DMLRuntimeException if DMLRuntimeException occurs
-        */
-       public static MatrixBlock mergeRowBlocks(ArrayList<MatrixBlock> mb, 
long numRowsPerBlock, long rlen, long clen, boolean isSparse) throws 
DMLRuntimeException {
-               if(clen >= Integer.MAX_VALUE)
-                       throw new DMLRuntimeException("Number of columns cannot 
be greater than " + Integer.MAX_VALUE);
-               if(rlen >= Integer.MAX_VALUE)
-                       throw new DMLRuntimeException("Number of rows cannot be 
greater than " + Integer.MAX_VALUE);
-               
-               MatrixBlock ret = new MatrixBlock((int)rlen, (int) clen, 
isSparse);
+       public static MatrixBlock allocateDenseOrSparse(int rlen, int clen, 
boolean isSparse) {
+               MatrixBlock ret = new MatrixBlock(rlen, clen, isSparse);
                ret.allocateDenseOrSparseBlock();
-               for(int i = 0; i < mb.size(); i++) {
-                       ret.copy((int)(i*numRowsPerBlock), 
(int)Math.min((i+1)*numRowsPerBlock-1, rlen-1), 0, (int)(clen-1), mb.get(i), 
false);
+               return ret;
+       }
+       public static MatrixBlock allocateDenseOrSparse(long rlen, long clen, 
boolean isSparse) throws DMLRuntimeException {
+               if(rlen > Integer.MAX_VALUE || clen > Integer.MAX_VALUE) {
+                       throw new DMLRuntimeException("Dimensions of matrix are 
too large to be passed via NumPy/SciPy:" + rlen + " X " + clen);
                }
+               return allocateDenseOrSparse(rlen, clen, isSparse);
+       }
+       
+       public static void copyRowBlocks(MatrixBlock mb, int rowIndex, 
MatrixBlock ret, int numRowsPerBlock, int rlen, int clen) throws 
DMLRuntimeException {
+               copyRowBlocks(mb, (long)rowIndex, ret, (long)numRowsPerBlock, 
(long)rlen, (long)clen);
+       }
+       public static void copyRowBlocks(MatrixBlock mb, long rowIndex, 
MatrixBlock ret, int numRowsPerBlock, int rlen, int clen) throws 
DMLRuntimeException {
+               copyRowBlocks(mb, (long)rowIndex, ret, (long)numRowsPerBlock, 
(long)rlen, (long)clen);
+       }
+       public static void copyRowBlocks(MatrixBlock mb, int rowIndex, 
MatrixBlock ret, long numRowsPerBlock, long rlen, long clen) throws 
DMLRuntimeException {
+               copyRowBlocks(mb, (long)rowIndex, ret, (long)numRowsPerBlock, 
(long)rlen, (long)clen);
+       }
+       public static void copyRowBlocks(MatrixBlock mb, long rowIndex, 
MatrixBlock ret, long numRowsPerBlock, long rlen, long clen) throws 
DMLRuntimeException {
+               // TODO: Double-check if synchronization is required here.
+               // synchronized (RDDConverterUtilsExt.class) {
+                       ret.copy((int)(rowIndex*numRowsPerBlock), 
(int)Math.min((rowIndex+1)*numRowsPerBlock-1, rlen-1), 0, (int)(clen-1), mb, 
false);
+               // }
+       }
+       
+       public static void postProcessAfterCopying(MatrixBlock ret) throws 
DMLRuntimeException {
                ret.recomputeNonZeros();
                ret.examSparsity();
-               return ret;
        }
        
        public static MatrixBlock convertPy4JArrayToMB(byte [] data, int rlen, 
int clen, boolean isSparse) throws DMLRuntimeException {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0f457181/src/main/python/systemml/converters.py
----------------------------------------------------------------------
diff --git a/src/main/python/systemml/converters.py 
b/src/main/python/systemml/converters.py
index 202d19a..416e5f9 100644
--- a/src/main/python/systemml/converters.py
+++ b/src/main/python/systemml/converters.py
@@ -25,7 +25,7 @@ import numpy as np
 import pandas as pd
 import math
 from pyspark.context import SparkContext
-from scipy.sparse import coo_matrix, spmatrix
+from scipy.sparse import coo_matrix, spmatrix, csr_matrix
 from .classloader import *
 
 SUPPORTED_TYPES = (np.ndarray, pd.DataFrame, spmatrix)
@@ -57,6 +57,7 @@ def convertToLabeledDF(sparkSession, X, y=None):
         return out.select('features')
 
 def _convertSPMatrixToMB(sc, src):
+    src = coo_matrix(src,  dtype=np.float64)
     numRows = src.shape[0]
     numCols = src.shape[1]
     data = src.data
@@ -77,40 +78,34 @@ def _convertDenseMatrixToMB(sc, src):
     createJavaObject(sc, 'dummy')
     return 
sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.convertPy4JArrayToMB(buf,
 numRows, numCols)
 
+def _copyRowBlock(i, sc, ret, src, numRowsPerBlock,  rlen, clen):
+    rowIndex = int(i / numRowsPerBlock)
+    mb = _convertSPMatrixToMB(sc, src[i:i+numRowsPerBlock,]) if 
isinstance(src, spmatrix) else _convertDenseMatrixToMB(sc, 
src[i:i+numRowsPerBlock,])
+    
sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.copyRowBlocks(mb,
 rowIndex, ret, numRowsPerBlock, rlen, clen)
+    return i
+    
 def convertToMatrixBlock(sc, src, maxSizeBlockInMB=8):
-    if isinstance(src, spmatrix):
-        src = coo_matrix(src,  dtype=np.float64)
-    else:
-        src = np.asarray(src, dtype=np.float64)
+    if not isinstance(sc, SparkContext):
+        raise TypeError('sc needs to be of type SparkContext')
+    isSparse = True if isinstance(src, spmatrix) else False
+    src = np.asarray(src, dtype=np.float64) if not isSparse else src
     if len(src.shape) != 2:
-        hint = ''
-        num_dim = len(src.shape)
-        type1 = str(type(src).__name__)
-        if type(src) == np.ndarray and num_dim == 1:
-            hint = '. Hint: If you intend to pass the 1-dimensional ndarray as 
a column-vector, please reshape it: input_ndarray.reshape(-1, 1)'
-        elif num_dim > 2:
-            hint = '. Hint: If you intend to pass a tensor, please reshape it 
into (N, CHW) format'
-        raise TypeError('Expected 2-dimensional ' + type1 + ', instead passed 
' + str(num_dim) + '-dimensional ' + type1 + hint)
+        src_type = str(type(src).__name__)
+        raise TypeError('Expected 2-dimensional ' + src_type + ', instead 
passed ' + str(len(src.shape)) + '-dimensional ' + src_type)
+    # Ignoring sparsity for computing numRowsPerBlock for now
     numRowsPerBlock = int(math.ceil((maxSizeBlockInMB*1000000) / 
(src.shape[1]*8)))
     multiBlockTransfer = False if numRowsPerBlock >= src.shape[0] else True
     if not multiBlockTransfer:
-        if isinstance(src, spmatrix):
-            return _convertSPMatrixToMB(sc, src)
-        elif isinstance(sc, SparkContext):
-            return _convertDenseMatrixToMB(sc, src)
-        else:
-            raise TypeError('sc needs to be of type SparkContext')
+        return _convertSPMatrixToMB(sc, src) if isSparse else 
_convertDenseMatrixToMB(sc, src)
     else:
-        if isinstance(src, spmatrix):
-            numRowsPerBlock = 1 # To avoid unnecessary conversion to csr and 
then coo again
-            rowBlocks = [ _convertSPMatrixToMB(sc, src.getrow(i)) for i in  
range(src.shape[0]) ]
-            isSparse = True
-        elif isinstance(sc, SparkContext):
-            rowBlocks = [ _convertDenseMatrixToMB(sc, 
src[i:i+numRowsPerBlock,]) for i in  range(0, src.shape[0], numRowsPerBlock) ]
-            isSparse = False
-        else:
-            raise TypeError('sc needs to be of type SparkContext')
-        return 
sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.mergeRowBlocks(rowBlocks,
 int(numRowsPerBlock), int(src.shape[0]), int(src.shape[1]), isSparse)
+        # Since coo_matrix does not have range indexing
+        src = csr_matrix(src) if isSparse else src
+        rlen = int(src.shape[0])
+        clen = int(src.shape[1])
+        ret = 
sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.allocateDenseOrSparse(rlen,
 clen, isSparse)
+        [ _copyRowBlock(i, sc, ret, src, numRowsPerBlock,  rlen, clen) for i 
in range(0, src.shape[0], numRowsPerBlock) ]
+        
sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.postProcessAfterCopying(ret)
+        return ret
 
 def convertToNumPyArr(sc, mb):
     if isinstance(sc, SparkContext):

Reply via email to