Repository: incubator-systemml Updated Branches: refs/heads/master f32bd8ebd -> 81090134d
[SYSTEMML-1370] Convert numpy to matrixblock by passing multiple blocks Here is the code to test this functionality: from systemml import MLContext, dml, convertToMatrixBlock import pandas as pd nr = 46900 X_pd = pd.DataFrame(range(1, (nr*784)+1,1),dtype=float).values.reshape(nr,784) convertToMatrixBlock(sc, X_pd) convertToMatrixBlock(sc, X_pd, maxSizeBlockInMB=100000) Closes #413. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/81090134 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/81090134 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/81090134 Branch: refs/heads/master Commit: 81090134d2de04a3ae90c6f8d79b4c68cb14aab5 Parents: f32bd8e Author: Niketan Pansare <npan...@us.ibm.com> Authored: Fri Mar 17 10:20:26 2017 -0800 Committer: Niketan Pansare <npan...@us.ibm.com> Committed: Fri Mar 17 11:20:25 2017 -0700 ---------------------------------------------------------------------- .../spark/utils/RDDConverterUtilsExt.java | 31 +++++++++ src/main/python/systemml/converters.py | 66 +++++++++++++------- 2 files changed, 75 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/81090134/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java index 6409735..d764109 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java @@ -165,6 +165,37 @@ public class RDDConverterUtilsExt return convertPy4JArrayToMB(data, (int) rlen, (int) clen, isSparse); } + public static MatrixBlock mergeRowBlocks(ArrayList<MatrixBlock> mb, int numRowsPerBlock, int rlen, int clen, boolean isSparse) throws DMLRuntimeException { + return mergeRowBlocks(mb, (long)numRowsPerBlock, (long)rlen, (long)clen, isSparse); + } + + /** + * This creates a MatrixBlock from list of row blocks + * + * @param mb list of row blocks + * @param numRowsPerBlock number of rows per block + * @param rlen number of rows + * @param clen number of columns + * @param isSparse is the output matrix in sparse format + * @return a matrix block of shape (rlen, clen) + * @throws DMLRuntimeException if DMLRuntimeException occurs + */ + public static MatrixBlock mergeRowBlocks(ArrayList<MatrixBlock> mb, long numRowsPerBlock, long rlen, long clen, boolean isSparse) throws DMLRuntimeException { + if(clen >= Integer.MAX_VALUE) + throw new DMLRuntimeException("Number of columns cannot be greater than " + Integer.MAX_VALUE); + if(rlen >= Integer.MAX_VALUE) + throw new DMLRuntimeException("Number of rows cannot be greater than " + Integer.MAX_VALUE); + + MatrixBlock ret = new MatrixBlock((int)rlen, (int) clen, isSparse); + ret.allocateDenseOrSparseBlock(); + for(int i = 0; i < mb.size(); i++) { + ret.copy((int)(i*numRowsPerBlock), (int)Math.min((i+1)*numRowsPerBlock-1, rlen-1), 0, (int)(clen-1), mb.get(i), false); + } + ret.recomputeNonZeros(); + ret.examSparsity(); + return ret; + } + public static MatrixBlock convertPy4JArrayToMB(byte [] data, int rlen, int clen, boolean isSparse) throws DMLRuntimeException { MatrixBlock mb = new MatrixBlock(rlen, clen, isSparse, -1); if(isSparse) { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/81090134/src/main/python/systemml/converters.py ---------------------------------------------------------------------- diff --git a/src/main/python/systemml/converters.py b/src/main/python/systemml/converters.py index 2d32508..ce709f5 100644 --- a/src/main/python/systemml/converters.py +++ b/src/main/python/systemml/converters.py @@ -23,6 +23,7 @@ __all__ = [ 'getNumCols', 'convertToMatrixBlock', 'convertToNumPyArr', 'convertT import numpy as np import pandas as pd +import math from pyspark.context import SparkContext from scipy.sparse import coo_matrix, spmatrix from .classloader import * @@ -55,32 +56,53 @@ def convertToLabeledDF(sparkSession, X, y=None): else: return out.select('features') +def _convertSPMatrixToMB(sc, src): + numRows = src.shape[0] + numCols = src.shape[1] + data = src.data + row = src.row.astype(np.int32) + col = src.col.astype(np.int32) + nnz = len(src.col) + buf1 = bytearray(data.tostring()) + buf2 = bytearray(row.tostring()) + buf3 = bytearray(col.tostring()) + createJavaObject(sc, 'dummy') + return sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.convertSciPyCOOToMB(buf1, buf2, buf3, numRows, numCols, nnz) + +def _convertDenseMatrixToMB(sc, src): + numCols = getNumCols(src) + numRows = src.shape[0] + arr = src.ravel().astype(np.float64) + buf = bytearray(arr.tostring()) + createJavaObject(sc, 'dummy') + return sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.convertPy4JArrayToMB(buf, numRows, numCols) -def convertToMatrixBlock(sc, src): +def convertToMatrixBlock(sc, src, maxSizeBlockInMB=8): if isinstance(src, spmatrix): src = coo_matrix(src, dtype=np.float64) - numRows = src.shape[0] - numCols = src.shape[1] - data = src.data - row = src.row.astype(np.int32) - col = src.col.astype(np.int32) - nnz = len(src.col) - buf1 = bytearray(data.tostring()) - buf2 = bytearray(row.tostring()) - buf3 = bytearray(col.tostring()) - createJavaObject(sc, 'dummy') - return sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.convertSciPyCOOToMB(buf1, buf2, buf3, numRows, numCols, nnz) - elif isinstance(sc, SparkContext): - src = np.asarray(src) - numCols = getNumCols(src) - numRows = src.shape[0] - arr = src.ravel().astype(np.float64) - buf = bytearray(arr.tostring()) - createJavaObject(sc, 'dummy') - return sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.convertPy4JArrayToMB(buf, numRows, numCols) else: - raise TypeError('sc needs to be of type SparkContext') # TODO: We can generalize this by creating py4j gateway ourselves - + src = np.asarray(src, dtype=np.float64) + numRowsPerBlock = int(math.ceil((maxSizeBlockInMB*1000000) / (src.shape[1]*8))) + # print("numRowsPerBlock=" + str(numRowsPerBlock)) + multiBlockTransfer = False if numRowsPerBlock >= src.shape[0] else True + if not multiBlockTransfer: + if isinstance(src, spmatrix): + return _convertSPMatrixToMB(sc, src) + elif isinstance(sc, SparkContext): + return _convertDenseMatrixToMB(sc, src) + else: + raise TypeError('sc needs to be of type SparkContext') + else: + if isinstance(src, spmatrix): + numRowsPerBlock = 1 # To avoid unnecessary conversion to csr and then coo again + rowBlocks = [ _convertSPMatrixToMB(sc, src.getrow(i)) for i in range(src.shape[0]) ] + isSparse = True + elif isinstance(sc, SparkContext): + rowBlocks = [ _convertDenseMatrixToMB(sc, src[i:i+numRowsPerBlock,]) for i in range(0, src.shape[0], numRowsPerBlock) ] + isSparse = False + else: + raise TypeError('sc needs to be of type SparkContext') + return sc._jvm.org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt.mergeRowBlocks(rowBlocks, int(numRowsPerBlock), int(src.shape[0]), int(src.shape[1]), isSparse) def convertToNumPyArr(sc, mb): if isinstance(sc, SparkContext):