Repository: incubator-systemml
Updated Branches:
  refs/heads/master f32bd8ebd -> 81090134d


[SYSTEMML-1370] Convert numpy to matrixblock by passing multiple blocks

Here is the code to test this functionality:

from systemml import MLContext, dml, convertToMatrixBlock
import pandas as pd
nr = 46900
X_pd = pd.DataFrame(range(1,
(nr*784)+1,1),dtype=float).values.reshape(nr,784)
convertToMatrixBlock(sc, X_pd)
convertToMatrixBlock(sc, X_pd, maxSizeBlockInMB=100000)

Closes #413.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/81090134
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/81090134
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/81090134

Branch: refs/heads/master
Commit: 81090134d2de04a3ae90c6f8d79b4c68cb14aab5
Parents: f32bd8e
Author: Niketan Pansare <npan...@us.ibm.com>
Authored: Fri Mar 17 10:20:26 2017 -0800
Committer: Niketan Pansare <npan...@us.ibm.com>
Committed: Fri Mar 17 11:20:25 2017 -0700

----------------------------------------------------------------------
 .../spark/utils/RDDConverterUtilsExt.java       | 31 +++++++++
 src/main/python/systemml/converters.py          | 66 +++++++++++++-------
 2 files changed, 75 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/81090134/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index 6409735..d764109 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -165,6 +165,37 @@ public class RDDConverterUtilsExt
                return convertPy4JArrayToMB(data, (int) rlen, (int) clen, 
isSparse);
        }
        
+       public static MatrixBlock mergeRowBlocks(ArrayList<MatrixBlock> mb, int 
numRowsPerBlock, int rlen, int clen, boolean isSparse) throws 
DMLRuntimeException {
+               return mergeRowBlocks(mb, (long)numRowsPerBlock, (long)rlen, 
(long)clen, isSparse);
+       }
+       
+       /**
+        * This creates a MatrixBlock from list of row blocks
+        * 
+        * @param mb list of row blocks
+        * @param numRowsPerBlock number of rows per block
+        * @param rlen number of rows
+        * @param clen number of columns
+        * @param isSparse is the output matrix in sparse format
+        * @return a matrix block of shape (rlen, clen)
+        * @throws DMLRuntimeException if DMLRuntimeException occurs
+        */
+       public static MatrixBlock mergeRowBlocks(ArrayList<MatrixBlock> mb, 
long numRowsPerBlock, long rlen, long clen, boolean isSparse) throws 
DMLRuntimeException {
+               if(clen >= Integer.MAX_VALUE)
+                       throw new DMLRuntimeException("Number of columns cannot 
be greater than " + Integer.MAX_VALUE);
+               if(rlen >= Integer.MAX_VALUE)
+                       throw new DMLRuntimeException("Number of rows cannot be 
greater than " + Integer.MAX_VALUE);
+               
+               MatrixBlock ret = new MatrixBlock((int)rlen, (int) clen, 
isSparse);
+               ret.allocateDenseOrSparseBlock();
+               for(int i = 0; i < mb.size(); i++) {
+                       ret.copy((int)(i*numRowsPerBlock), 
(int)Math.min((i+1)*numRowsPerBlock-1, rlen-1), 0, (int)(clen-1), mb.get(i), 
false);
+               }
+               ret.recomputeNonZeros();
+               ret.examSparsity();
+               return ret;
+       }
+       
        public static MatrixBlock convertPy4JArrayToMB(byte [] data, int rlen, 
int clen, boolean isSparse) throws DMLRuntimeException {
                MatrixBlock mb = new MatrixBlock(rlen, clen, isSparse, -1);
                if(isSparse) {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/81090134/src/main/python/systemml/converters.py
----------------------------------------------------------------------
diff --git a/src/main/python/systemml/converters.py 
b/src/main/python/systemml/converters.py
index 2d32508..ce709f5 100644
--- a/src/main/python/systemml/converters.py
+++ b/src/main/python/systemml/converters.py
@@ -23,6 +23,7 @@ __all__ = [ 'getNumCols', 'convertToMatrixBlock', 
'convertToNumPyArr', 'convertT
 
 import numpy as np
 import pandas as pd
+import math
 from pyspark.context import SparkContext
 from scipy.sparse import coo_matrix, spmatrix
 from .classloader import *
@@ -55,32 +56,53 @@ def convertToLabeledDF(sparkSession, X, y=None):
     else:
         return out.select('features')
 
+def _convertSPMatrixToMB(sc, src):
+    numRows = src.shape[0]
+    numCols = src.shape[1]
+    data = src.data
+    row = src.row.astype(np.int32)
+    col = src.col.astype(np.int32)
+    nnz = len(src.col)
+    buf1 = bytearray(data.tostring())
+    buf2 = bytearray(row.tostring())
+    buf3 = bytearray(col.tostring())
+    createJavaObject(sc, 'dummy')
+    return 
sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.convertSciPyCOOToMB(buf1,
 buf2, buf3, numRows, numCols, nnz)
+            
+def _convertDenseMatrixToMB(sc, src):
+    numCols = getNumCols(src)
+    numRows = src.shape[0]
+    arr = src.ravel().astype(np.float64)
+    buf = bytearray(arr.tostring())
+    createJavaObject(sc, 'dummy')
+    return 
sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.convertPy4JArrayToMB(buf,
 numRows, numCols)
 
-def convertToMatrixBlock(sc, src):
+def convertToMatrixBlock(sc, src, maxSizeBlockInMB=8):
     if isinstance(src, spmatrix):
         src = coo_matrix(src,  dtype=np.float64)
-        numRows = src.shape[0]
-        numCols = src.shape[1]
-        data = src.data
-        row = src.row.astype(np.int32)
-        col = src.col.astype(np.int32)
-        nnz = len(src.col)
-        buf1 = bytearray(data.tostring())
-        buf2 = bytearray(row.tostring())
-        buf3 = bytearray(col.tostring())
-        createJavaObject(sc, 'dummy')
-        return 
sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.convertSciPyCOOToMB(buf1,
 buf2, buf3, numRows, numCols, nnz)
-    elif isinstance(sc, SparkContext):
-        src = np.asarray(src)
-        numCols = getNumCols(src)
-        numRows = src.shape[0]
-        arr = src.ravel().astype(np.float64)
-        buf = bytearray(arr.tostring())
-        createJavaObject(sc, 'dummy')
-        return 
sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.convertPy4JArrayToMB(buf,
 numRows, numCols)
     else:
-        raise TypeError('sc needs to be of type SparkContext') # TODO: We can 
generalize this by creating py4j gateway ourselves
-
+        src = np.asarray(src, dtype=np.float64)
+    numRowsPerBlock = int(math.ceil((maxSizeBlockInMB*1000000) / 
(src.shape[1]*8)))
+    # print("numRowsPerBlock=" + str(numRowsPerBlock))
+    multiBlockTransfer = False if numRowsPerBlock >= src.shape[0] else True
+    if not multiBlockTransfer:
+        if isinstance(src, spmatrix):
+            return _convertSPMatrixToMB(sc, src)
+        elif isinstance(sc, SparkContext):
+            return _convertDenseMatrixToMB(sc, src)
+        else:
+            raise TypeError('sc needs to be of type SparkContext')
+    else:
+        if isinstance(src, spmatrix):
+            numRowsPerBlock = 1 # To avoid unnecessary conversion to csr and 
then coo again
+            rowBlocks = [ _convertSPMatrixToMB(sc, src.getrow(i)) for i in  
range(src.shape[0]) ]
+            isSparse = True
+        elif isinstance(sc, SparkContext):
+            rowBlocks = [ _convertDenseMatrixToMB(sc, 
src[i:i+numRowsPerBlock,]) for i in  range(0, src.shape[0], numRowsPerBlock) ]
+            isSparse = False
+        else:
+            raise TypeError('sc needs to be of type SparkContext')
+        return 
sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.mergeRowBlocks(rowBlocks,
 int(numRowsPerBlock), int(src.shape[0]), int(src.shape[1]), isSparse)
 
 def convertToNumPyArr(sc, mb):
     if isinstance(sc, SparkContext):

Reply via email to