Repository: incubator-systemml
Updated Branches:
refs/heads/master 9a77b7bcc -> 0f4571810
[SYSTEMML-1442] Reduce the JVM memory required for transfering Numpy/Scipy array
Initial experiments show that it takes 17.52155303955078 seconds to
transfer a NumPy array (shape=10000 X 5000) to SystemML as a Matrixblock
as well as output sum (using MLContext) using existing approach, whereas
it takes 458.05856013298035 seconds to do the same via DataFrame _py2java.
```python
def executeNumpy():
script = sml.dml('print("X:" +
sum(X))').input(X=np.random.rand(num_rows,num_cols))
ml.execute(script)
def executeDF():
script = sml.dml('print("X:" +
sum(X))').input(X=sqlCtx.createDataFrame(pd.DataFrame(np.random.rand(num_rows,num_cols))))
ml.execute(script)
```
Closes #443.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit:
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/0f457181
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/0f457181
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/0f457181
Branch: refs/heads/master
Commit: 0f45718106b8864ef5aee56a742644d11eabf3e8
Parents: 9a77b7b
Author: Niketan Pansare <[email protected]>
Authored: Thu Mar 30 13:43:53 2017 -0700
Committer: Niketan Pansare <[email protected]>
Committed: Thu Mar 30 13:49:49 2017 -0700
----------------------------------------------------------------------
.../sysml/api/mlcontext/MLContextUtil.java | 16 ++++--
.../spark/utils/RDDConverterUtilsExt.java | 52 ++++++++++---------
src/main/python/systemml/converters.py | 53 +++++++++-----------
3 files changed, 62 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0f457181/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
index 17aa4a7..4cd95d4 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
@@ -889,11 +889,17 @@ public final class MLContextUtil {
}
sb.append(") ");
- sb.append(key);
- sb.append(": ");
- String str = object.toString();
- str = StringUtils.abbreviate(str, 100);
- sb.append(str);
+ sb.append(key);
+ sb.append(": ");
+ String str = null;
+ if(object instanceof MatrixBlock) {
+ MatrixBlock mb = (MatrixBlock) object;
+ str = "MatrixBlock [sparse? = " +
mb.isInSparseFormat() + ", nonzeros = " + mb.getNonZeros() + ", size: " +
mb.getNumRows() + " X " + mb.getNumColumns() + "]";
+ }
+ else
+ str = object.toString(); // TODO: Deal
with OOM for other objects such as Frame, etc
+ str = StringUtils.abbreviate(str, 100);
+ sb.append(str);
sb.append("\n");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0f457181/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index d764109..f206fbd 100644
---
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -165,35 +165,37 @@ public class RDDConverterUtilsExt
return convertPy4JArrayToMB(data, (int) rlen, (int) clen,
isSparse);
}
- public static MatrixBlock mergeRowBlocks(ArrayList<MatrixBlock> mb, int
numRowsPerBlock, int rlen, int clen, boolean isSparse) throws
DMLRuntimeException {
- return mergeRowBlocks(mb, (long)numRowsPerBlock, (long)rlen,
(long)clen, isSparse);
- }
-
- /**
- * This creates a MatrixBlock from list of row blocks
- *
- * @param mb list of row blocks
- * @param numRowsPerBlock number of rows per block
- * @param rlen number of rows
- * @param clen number of columns
- * @param isSparse is the output matrix in sparse format
- * @return a matrix block of shape (rlen, clen)
- * @throws DMLRuntimeException if DMLRuntimeException occurs
- */
- public static MatrixBlock mergeRowBlocks(ArrayList<MatrixBlock> mb,
long numRowsPerBlock, long rlen, long clen, boolean isSparse) throws
DMLRuntimeException {
- if(clen >= Integer.MAX_VALUE)
- throw new DMLRuntimeException("Number of columns cannot
be greater than " + Integer.MAX_VALUE);
- if(rlen >= Integer.MAX_VALUE)
- throw new DMLRuntimeException("Number of rows cannot be
greater than " + Integer.MAX_VALUE);
-
- MatrixBlock ret = new MatrixBlock((int)rlen, (int) clen,
isSparse);
+ public static MatrixBlock allocateDenseOrSparse(int rlen, int clen,
boolean isSparse) {
+ MatrixBlock ret = new MatrixBlock(rlen, clen, isSparse);
ret.allocateDenseOrSparseBlock();
- for(int i = 0; i < mb.size(); i++) {
- ret.copy((int)(i*numRowsPerBlock),
(int)Math.min((i+1)*numRowsPerBlock-1, rlen-1), 0, (int)(clen-1), mb.get(i),
false);
+ return ret;
+ }
+ public static MatrixBlock allocateDenseOrSparse(long rlen, long clen,
boolean isSparse) throws DMLRuntimeException {
+ if(rlen > Integer.MAX_VALUE || clen > Integer.MAX_VALUE) {
+ throw new DMLRuntimeException("Dimensions of matrix are
too large to be passed via NumPy/SciPy:" + rlen + " X " + clen);
}
+ return allocateDenseOrSparse(rlen, clen, isSparse);
+ }
+
+ public static void copyRowBlocks(MatrixBlock mb, int rowIndex,
MatrixBlock ret, int numRowsPerBlock, int rlen, int clen) throws
DMLRuntimeException {
+ copyRowBlocks(mb, (long)rowIndex, ret, (long)numRowsPerBlock,
(long)rlen, (long)clen);
+ }
+ public static void copyRowBlocks(MatrixBlock mb, long rowIndex,
MatrixBlock ret, int numRowsPerBlock, int rlen, int clen) throws
DMLRuntimeException {
+ copyRowBlocks(mb, (long)rowIndex, ret, (long)numRowsPerBlock,
(long)rlen, (long)clen);
+ }
+ public static void copyRowBlocks(MatrixBlock mb, int rowIndex,
MatrixBlock ret, long numRowsPerBlock, long rlen, long clen) throws
DMLRuntimeException {
+ copyRowBlocks(mb, (long)rowIndex, ret, (long)numRowsPerBlock,
(long)rlen, (long)clen);
+ }
+ public static void copyRowBlocks(MatrixBlock mb, long rowIndex,
MatrixBlock ret, long numRowsPerBlock, long rlen, long clen) throws
DMLRuntimeException {
+ // TODO: Double-check if synchronization is required here.
+ // synchronized (RDDConverterUtilsExt.class) {
+ ret.copy((int)(rowIndex*numRowsPerBlock),
(int)Math.min((rowIndex+1)*numRowsPerBlock-1, rlen-1), 0, (int)(clen-1), mb,
false);
+ // }
+ }
+
+ public static void postProcessAfterCopying(MatrixBlock ret) throws
DMLRuntimeException {
ret.recomputeNonZeros();
ret.examSparsity();
- return ret;
}
public static MatrixBlock convertPy4JArrayToMB(byte [] data, int rlen,
int clen, boolean isSparse) throws DMLRuntimeException {
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0f457181/src/main/python/systemml/converters.py
----------------------------------------------------------------------
diff --git a/src/main/python/systemml/converters.py
b/src/main/python/systemml/converters.py
index 202d19a..416e5f9 100644
--- a/src/main/python/systemml/converters.py
+++ b/src/main/python/systemml/converters.py
@@ -25,7 +25,7 @@ import numpy as np
import pandas as pd
import math
from pyspark.context import SparkContext
-from scipy.sparse import coo_matrix, spmatrix
+from scipy.sparse import coo_matrix, spmatrix, csr_matrix
from .classloader import *
SUPPORTED_TYPES = (np.ndarray, pd.DataFrame, spmatrix)
@@ -57,6 +57,7 @@ def convertToLabeledDF(sparkSession, X, y=None):
return out.select('features')
def _convertSPMatrixToMB(sc, src):
+ src = coo_matrix(src, dtype=np.float64)
numRows = src.shape[0]
numCols = src.shape[1]
data = src.data
@@ -77,40 +78,34 @@ def _convertDenseMatrixToMB(sc, src):
createJavaObject(sc, 'dummy')
return
sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.convertPy4JArrayToMB(buf,
numRows, numCols)
+def _copyRowBlock(i, sc, ret, src, numRowsPerBlock, rlen, clen):
+ rowIndex = int(i / numRowsPerBlock)
+ mb = _convertSPMatrixToMB(sc, src[i:i+numRowsPerBlock,]) if
isinstance(src, spmatrix) else _convertDenseMatrixToMB(sc,
src[i:i+numRowsPerBlock,])
+
sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.copyRowBlocks(mb,
rowIndex, ret, numRowsPerBlock, rlen, clen)
+ return i
+
def convertToMatrixBlock(sc, src, maxSizeBlockInMB=8):
- if isinstance(src, spmatrix):
- src = coo_matrix(src, dtype=np.float64)
- else:
- src = np.asarray(src, dtype=np.float64)
+ if not isinstance(sc, SparkContext):
+ raise TypeError('sc needs to be of type SparkContext')
+ isSparse = True if isinstance(src, spmatrix) else False
+ src = np.asarray(src, dtype=np.float64) if not isSparse else src
if len(src.shape) != 2:
- hint = ''
- num_dim = len(src.shape)
- type1 = str(type(src).__name__)
- if type(src) == np.ndarray and num_dim == 1:
- hint = '. Hint: If you intend to pass the 1-dimensional ndarray as
a column-vector, please reshape it: input_ndarray.reshape(-1, 1)'
- elif num_dim > 2:
- hint = '. Hint: If you intend to pass a tensor, please reshape it
into (N, CHW) format'
- raise TypeError('Expected 2-dimensional ' + type1 + ', instead passed
' + str(num_dim) + '-dimensional ' + type1 + hint)
+ src_type = str(type(src).__name__)
+ raise TypeError('Expected 2-dimensional ' + src_type + ', instead
passed ' + str(len(src.shape)) + '-dimensional ' + src_type)
+ # Ignoring sparsity for computing numRowsPerBlock for now
numRowsPerBlock = int(math.ceil((maxSizeBlockInMB*1000000) /
(src.shape[1]*8)))
multiBlockTransfer = False if numRowsPerBlock >= src.shape[0] else True
if not multiBlockTransfer:
- if isinstance(src, spmatrix):
- return _convertSPMatrixToMB(sc, src)
- elif isinstance(sc, SparkContext):
- return _convertDenseMatrixToMB(sc, src)
- else:
- raise TypeError('sc needs to be of type SparkContext')
+ return _convertSPMatrixToMB(sc, src) if isSparse else
_convertDenseMatrixToMB(sc, src)
else:
- if isinstance(src, spmatrix):
- numRowsPerBlock = 1 # To avoid unnecessary conversion to csr and
then coo again
- rowBlocks = [ _convertSPMatrixToMB(sc, src.getrow(i)) for i in
range(src.shape[0]) ]
- isSparse = True
- elif isinstance(sc, SparkContext):
- rowBlocks = [ _convertDenseMatrixToMB(sc,
src[i:i+numRowsPerBlock,]) for i in range(0, src.shape[0], numRowsPerBlock) ]
- isSparse = False
- else:
- raise TypeError('sc needs to be of type SparkContext')
- return
sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.mergeRowBlocks(rowBlocks,
int(numRowsPerBlock), int(src.shape[0]), int(src.shape[1]), isSparse)
+ # Since coo_matrix does not have range indexing
+ src = csr_matrix(src) if isSparse else src
+ rlen = int(src.shape[0])
+ clen = int(src.shape[1])
+ ret =
sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.allocateDenseOrSparse(rlen,
clen, isSparse)
+ [ _copyRowBlock(i, sc, ret, src, numRowsPerBlock, rlen, clen) for i
in range(0, src.shape[0], numRowsPerBlock) ]
+
sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.postProcessAfterCopying(ret)
+ return ret
def convertToNumPyArr(sc, mb):
if isinstance(sc, SparkContext):