Repository: incubator-systemml Updated Branches: refs/heads/master 6e13dd3b1 -> 697ea5e4f
[SYSTEMML-560] Frame converter between Matrix and Binary block Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/697ea5e4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/697ea5e4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/697ea5e4 Branch: refs/heads/master Commit: 697ea5e4fbbb3b109a7089a12b365d23de3422a1 Parents: 6e13dd3 Author: Arvind Surve <[email protected]> Authored: Mon May 9 23:58:30 2016 -0700 Committer: Arvind Surve <[email protected]> Committed: Mon May 9 23:58:30 2016 -0700 ---------------------------------------------------------------------- .../spark/utils/FrameRDDConverterUtils.java | 267 ++++++++++++++++++- .../sysml/runtime/matrix/data/FrameBlock.java | 2 +- .../matrix/mapred/FrameReblockBuffer.java | 42 +-- .../runtime/matrix/mapred/ReblockBuffer.java | 63 ++--- .../functions/frame/FrameConverterTest.java | 220 ++++++++++++++- 5 files changed, 503 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/697ea5e4/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java index 02134c8..5f319b9 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java @@ -46,11 +46,16 @@ import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.matrix.mapred.FrameReblockBuffer; +import org.apache.sysml.runtime.util.DataConverter; import org.apache.sysml.runtime.util.FastStringTokenizer; import org.apache.sysml.runtime.util.UtilFunctions; + + public class FrameRDDConverterUtils { //===================================== @@ -212,7 +217,85 @@ public class FrameRDDConverterUtils } } + //===================================== + // Matrix block <--> Binary block + + /** + * + * @param sc + * @param input + * @param mcIn + * @return + * @throws DMLRuntimeException + */ + public static JavaPairRDD<LongWritable, FrameBlock> matrixBlockToBinaryBlock(JavaSparkContext sc, + JavaPairRDD<MatrixIndexes, MatrixBlock> input, MatrixCharacteristics mcIn) + throws DMLRuntimeException + { + //Do actual conversion + JavaPairRDD<Long, FrameBlock> output = matrixBlockToBinaryBlockLongIndex(sc,input, mcIn); + + //convert input rdd to serializable LongWritable/frame block + JavaPairRDD<LongWritable,FrameBlock> out = + output.mapToPair(new LongFrameToLongWritableFrameFunction()); + + return out; + } + + + /** + * + * @param sc + * @param input + * @param mcIn + * @return + * @throws DMLRuntimeException + */ + public static JavaPairRDD<Long, FrameBlock> matrixBlockToBinaryBlockLongIndex(JavaSparkContext sc, + JavaPairRDD<MatrixIndexes, MatrixBlock> input, MatrixCharacteristics mcIn) + throws DMLRuntimeException + { + JavaPairRDD<Long, FrameBlock> out = null; + + if(mcIn.getCols() > mcIn.getColsPerBlock()) { + + out = input.flatMapToPair(new MatrixToBinaryBlockFunction(mcIn)); + + //aggregate partial frame blocks + if(mcIn.getCols() > mcIn.getColsPerBlock()) + out = RDDAggregateUtils.mergeByFrameKey( out ); //TODO: Will need better merger + } + else + out = input.mapToPair(new MatrixToBinaryBlockOneColumnBlockFunction(mcIn)); + + return out; + } + + + /** + * + * @param in + * @param mcIn + * @param props + * @param strict + * @return + */ + public static JavaPairRDD<MatrixIndexes, MatrixBlock> binaryBlockToMatrixBlock(JavaPairRDD<LongWritable,FrameBlock> input, + MatrixCharacteristics mcIn, MatrixCharacteristics mcOut) + { + //convert binary block to matrix block + JavaPairRDD<MatrixIndexes, MatrixBlock> out = input + .flatMapToPair(new BinaryBlockToMatrixBlockFunction(mcIn, mcOut)); + + //aggregate partial matrix blocks + out = RDDAggregateUtils.mergeByKey( out ); + + return out; + } + + + ///////////////////////////////// // CSV-SPECIFIC FUNCTIONS @@ -303,7 +386,7 @@ public class FrameRDDConverterUtils private boolean _fill = false; private int _maxRowsPerBlock = -1; - protected static final int BUFFER_SIZE = 2 * 1000 * 1000; //2M elements + protected static final int BUFFER_SIZE = 1 * 1000 * 1000; //1M elements, size of default matrix block public CSVToBinaryBlockFunction(MatrixCharacteristics mc, boolean hasHeader, String delim, boolean fill) @@ -437,7 +520,7 @@ public class FrameRDDConverterUtils private static final long serialVersionUID = -729614449626680946L; //internal buffer size (aligned w/ default matrix block size) - protected static final int BUFFER_SIZE = 4 * 1000 * 1000; //4M elements (32MB) + protected static final int BUFFER_SIZE = 1 * 1000 * 1000; //1M elements (8MB), size of default matrix block protected int _bufflen = -1; protected long _rlen = -1; @@ -518,5 +601,183 @@ public class FrameRDDConverterUtils return ret; } - } + } + + // MATRIX Block <---> Binary Block specific functions + private static class MatrixToBinaryBlockFunction implements PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>,Long,FrameBlock> + { + private static final long serialVersionUID = 6205071301074768437L; + + private int _brlen = -1; + private int _bclen = -1; + private long _clen = -1; + private int _maxRowsPerBlock = -1; + + + protected static final int BUFFER_SIZE = 1 * 1000 * 1000; //1M elements (Default matrix block size) + + + public MatrixToBinaryBlockFunction(MatrixCharacteristics mc) + { + _brlen = mc.getRowsPerBlock(); + _bclen = mc.getColsPerBlock(); + _clen = mc.getCols(); + _maxRowsPerBlock = Math.max((int) (BUFFER_SIZE/_clen), 1); + } + + @Override + public Iterable<Tuple2<Long, FrameBlock>> call(Tuple2<MatrixIndexes,MatrixBlock> arg0) + throws Exception + { + ArrayList<Tuple2<Long,FrameBlock>> ret = new ArrayList<Tuple2<Long,FrameBlock>>(); + + MatrixIndexes matrixIndexes = arg0._1(); + MatrixBlock matrixBlock = arg0._2(); + + //Frame Index (Row id, with base 1) + Long rowix = new Long((matrixIndexes.getRowIndex()-1)*_brlen+1); + + //Global index within frame blocks + long colixLow = (int)((matrixIndexes.getColumnIndex()-1)*_bclen+1); + long colixHigh = Math.min(colixLow+matrixBlock.getMaxColumn()-1, _clen); + + //Index within a local matrix block + int iColLowMat = UtilFunctions.computeCellInBlock(colixLow, _bclen); + int iColHighMat = UtilFunctions.computeCellInBlock(colixHigh, _bclen); + + FrameBlock tmpBlock = DataConverter.convertToFrameBlock(matrixBlock); + + int iRowLow = 0; //Index within a local frame block + while(iRowLow < matrixBlock.getMaxRow()) { + int iRowHigh = Math.min(iRowLow+_maxRowsPerBlock-1, matrixBlock.getMaxRow()-1); + + FrameBlock tmpBlock2 = null; + //All rows from matrix block can fit into single frame block, no need for slicing + if(iRowLow == 0 && iRowHigh == matrixBlock.getMaxRow()-1) + tmpBlock2 = tmpBlock; + else + tmpBlock2 = tmpBlock.sliceOperations(iRowLow, iRowHigh, iColLowMat, iColHighMat, tmpBlock2); + + //If Matrix has only one column block, then simply assigns converted block to frame block + if(colixLow == 0 && colixHigh == matrixBlock.getMaxColumn()-1) + ret.add(new Tuple2<Long, FrameBlock>(rowix+iRowLow, tmpBlock2)); + else + { + FrameBlock frameBlock = new FrameBlock((int)_clen, ValueType.STRING); + frameBlock.ensureAllocatedColumns(iRowHigh-iRowLow+1); + + frameBlock.copy(0, iRowHigh-iRowLow, (int)colixLow-1, (int)colixHigh-1, tmpBlock2); + ret.add(new Tuple2<Long, FrameBlock>(rowix+iRowLow, frameBlock)); + } + iRowLow = iRowHigh+1; + } + return ret; + } + } + + /* + * This function supports if matrix has only one column block. + */ + private static class MatrixToBinaryBlockOneColumnBlockFunction implements PairFunction<Tuple2<MatrixIndexes,MatrixBlock>,Long,FrameBlock> + { + private static final long serialVersionUID = 3716019666116660815L; + + private int _brlen = -1; + private int _bclen = -1; + private long _clen = -1; + + + public MatrixToBinaryBlockOneColumnBlockFunction(MatrixCharacteristics mc) + { + _brlen = mc.getRowsPerBlock(); + _bclen = mc.getColsPerBlock(); + _clen = mc.getCols(); + } + + @Override + public Tuple2<Long, FrameBlock> call(Tuple2<MatrixIndexes,MatrixBlock> arg0) + throws Exception + { + if(_clen > _bclen) + throw new DMLRuntimeException("The input matrix has more than one column block, this function supports only one column block."); + + MatrixIndexes matrixIndexes = arg0._1(); + MatrixBlock matrixBlock = arg0._2(); + + //Frame Index (Row id, with base 1) + Long rowix = new Long((matrixIndexes.getRowIndex()-1)*_brlen+1); + + FrameBlock frameBlock = DataConverter.convertToFrameBlock(matrixBlock); + return new Tuple2<Long, FrameBlock>(rowix, frameBlock); + } + } + + + /** + * + */ + private static class BinaryBlockToMatrixBlockFunction implements PairFlatMapFunction<Tuple2<LongWritable,FrameBlock>,MatrixIndexes, MatrixBlock> + { + private static final long serialVersionUID = -2654986510471835933L; + + MatrixCharacteristics _mcIn, _mcOut; + + public BinaryBlockToMatrixBlockFunction(MatrixCharacteristics mcIn, + MatrixCharacteristics mcOut) { + + _mcIn = mcIn; //Frame Characteristics + _mcOut = mcOut; //Matrix Characteristics + } + + @Override + public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<LongWritable, FrameBlock> arg0) + throws Exception + { + long rowIndex = arg0._1().get(); + FrameBlock blk = arg0._2(); + + ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>(); + + int _brlenMatrix = _mcOut.getRowsPerBlock(); + int _bclenMatrix = _mcOut.getColsPerBlock(); + long _rlen = _mcIn.getRows(); + long _clen = _mcIn.getCols(); + + long lRowId = 0; + while (lRowId < blk.getNumRows()) { + // Global Row indices (indexes) across all frame blocks + long endRow = ((rowIndex+lRowId-1)/_brlenMatrix+1) * _brlenMatrix; + long begRow = Math.max(endRow-_brlenMatrix+1, 0); + endRow = Math.min(endRow, _rlen); + + // Local Row indices (indexes) within a matrix block + long begRowMat = UtilFunctions.computeCellInBlock(begRow, _brlenMatrix); + long endRowMat = UtilFunctions.computeCellInBlock(endRow, _brlenMatrix); + + long lColId = 0; + while (lColId < blk.getNumColumns()) { + // Global Column index across all frame blocks + long endCol = Math.min(lColId+_bclenMatrix-1, _clen-1); + + // Local Column indices (indexes) within a matrix block + long begColMat = UtilFunctions.computeCellInBlock(lColId+1, _bclenMatrix); + long endColMat = UtilFunctions.computeCellInBlock(endCol+1, _bclenMatrix); + + FrameBlock tmpFrame = new FrameBlock(); + tmpFrame = blk.sliceOperations((int)lRowId, (int)(lRowId+endRowMat-begRowMat), (int)lColId, (int)endCol, tmpFrame); + + MatrixIndexes matrixIndexes = new MatrixIndexes(UtilFunctions.computeBlockIndex(begRow+1, _brlenMatrix),UtilFunctions.computeBlockIndex(lColId+1, _bclenMatrix)); + + MatrixBlock matrixBlocktmp = DataConverter.convertToMatrixBlock(tmpFrame); + MatrixBlock matrixBlock = matrixBlocktmp.leftIndexingOperations(matrixBlocktmp, (int)begRowMat, (int)endRowMat, (int)begColMat, (int)endColMat, new MatrixBlock(), true); + ret.add(new Tuple2<MatrixIndexes, MatrixBlock>(matrixIndexes, matrixBlock)); + + lColId = endCol+1; + } + lRowId += (endRow-begRow+1); + } + + return ret; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/697ea5e4/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java index 6323fea..d9cf8c3 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java @@ -636,7 +636,7 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable //general case w/ schema transformation else for( int i=rl; i<=ru; i++ ) { - String tmp = src.get(i-rl, j)!=null ? src.get(i-rl, j).toString() : null; + String tmp = src.get(i-rl, j-cl)!=null ? src.get(i-rl, j-cl).toString() : null; set(i, j, UtilFunctions.stringToObject(_schema.get(j), tmp)); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/697ea5e4/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java index 9b2cd37..2762a0d 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java @@ -32,6 +32,7 @@ import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.Pair; +import org.apache.sysml.runtime.util.UtilFunctions; /** * @@ -88,34 +89,11 @@ public class FrameReblockBuffer _rlen = rlen; _clen = clen; _brlen = Math.max((int)(_bufflen/_clen), 1); - _bclen = 1; + _bclen = (int)clen; _schema = schema; } - - /** - * - * @param ix - * @param blen - * @return - */ - private long getBlockIndex( long ix, int blen ) - { - return (ix-1)/_brlen+1; - } - - /** - * - * @param ix - * @param blen - * @return - */ - private int getIndexInBlock( long ix, int blen ) - { - return (int)((ix-1)%_brlen); - } - public int getSize() { return _count; @@ -194,8 +172,8 @@ public class FrameReblockBuffer long cbi = -1, cbj = -1; //current block indexes for( int i=0; i<_count; i++ ) { - long bi = getBlockIndex(_buff[i].getRow(), _brlen); - long bj = getBlockIndex(_buff[i].getCol(), _bclen); + long bi = UtilFunctions.computeBlockIndex(_buff[i].getRow(), _brlen); + long bj = UtilFunctions.computeBlockIndex(_buff[i].getCol(), _bclen); //output block and switch to next index pair if( bi != cbi || bj != cbj ) { @@ -206,8 +184,8 @@ public class FrameReblockBuffer tmpBlock.reset(Math.min(_brlen, (int)(_rlen-(bi-1)*_brlen))); } - int ci = getIndexInBlock(_buff[i].getRow(), _brlen); - int cj = getIndexInBlock(_buff[i].getCol(), _bclen); + int ci = UtilFunctions.computeCellInBlock(_buff[i].getRow(), _brlen); + int cj = UtilFunctions.computeCellInBlock(_buff[i].getCol(), _bclen); tmpBlock.set(ci, cj, _buff[i].getObjVal()); } @@ -240,8 +218,8 @@ public class FrameReblockBuffer long cbi = -1, cbj = -1; //current block indexes for( int i=0; i<_count; i++ ) { - long bi = getBlockIndex(_buff[i].getRow(), _brlen); - long bj = getBlockIndex(_buff[i].getCol(), _bclen); + long bi = UtilFunctions.computeBlockIndex(_buff[i].getRow(), _brlen); + long bj = UtilFunctions.computeBlockIndex(_buff[i].getCol(), _bclen); //output block and switch to next index pair if( bi != cbi || bj != cbj ) { @@ -255,8 +233,8 @@ public class FrameReblockBuffer } - int ci = getIndexInBlock(_buff[i].getRow(), _brlen); - int cj = getIndexInBlock(_buff[i].getCol(), _bclen); + int ci = UtilFunctions.computeCellInBlock(_buff[i].getRow(), _brlen); + int cj = UtilFunctions.computeCellInBlock(_buff[i].getCol(), _bclen); tmpBlock.set(ci, cj, _buff[i].getObjVal()); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/697ea5e4/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java index 3b7dd4a..98416ea 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java @@ -35,6 +35,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.PartialBlock; import org.apache.sysml.runtime.matrix.data.TaggedAdaptivePartialBlock; +import org.apache.sysml.runtime.util.UtilFunctions; /** * @@ -175,8 +176,8 @@ public class ReblockBuffer long cbi = -1, cbj = -1; //current block indexes for( int i=0; i<_count; i++ ) { - long bi = getBlockIndex(_buff[i][0], _brlen); - long bj = getBlockIndex(_buff[i][1], _bclen); + long bi = UtilFunctions.computeBlockIndex(_buff[i][0], _brlen); + long bj = UtilFunctions.computeBlockIndex(_buff[i][1], _bclen); //switch to next block if( bi != cbi || bj != cbj ) { @@ -208,8 +209,8 @@ public class ReblockBuffer cbi = -1; cbj = -1; //current block indexes for( int i=0; i<_count; i++ ) { - long bi = getBlockIndex(_buff[i][0], _brlen); - long bj = getBlockIndex(_buff[i][1], _bclen); + long bi = UtilFunctions.computeBlockIndex(_buff[i][0], _brlen); + long bj = UtilFunctions.computeBlockIndex(_buff[i][1], _bclen); //output block and switch to next index pair if( bi != cbi || bj != cbj ) { @@ -221,8 +222,8 @@ public class ReblockBuffer Math.min(_bclen, (int)(_clen-(bj-1)*_bclen)), sparse); } - int ci = getIndexInBlock(_buff[i][0], _brlen); - int cj = getIndexInBlock(_buff[i][1], _bclen); + int ci = UtilFunctions.computeCellInBlock(_buff[i][0], _brlen); + int cj = UtilFunctions.computeCellInBlock(_buff[i][1], _bclen); double tmp = Double.longBitsToDouble(_buff[i][2]); tmpBlock.appendValue(ci, cj, tmp); } @@ -236,10 +237,10 @@ public class ReblockBuffer outVal.set(tmpVal); for( int i=0; i<_count; i++ ) { - long bi = getBlockIndex(_buff[i][0], _brlen); - long bj = getBlockIndex(_buff[i][1], _bclen); - int ci = getIndexInBlock(_buff[i][0], _brlen); - int cj = getIndexInBlock(_buff[i][1], _bclen); + long bi = UtilFunctions.computeBlockIndex(_buff[i][0], _brlen); + long bj = UtilFunctions.computeBlockIndex(_buff[i][1], _bclen); + int ci = UtilFunctions.computeCellInBlock(_buff[i][0], _brlen); + int cj = UtilFunctions.computeCellInBlock(_buff[i][1], _bclen); double tmp = Double.longBitsToDouble(_buff[i][2]); tmpIx.setIndexes(bi, bj); tmpVal.set(ci, cj, tmp); //in outVal, in outTVal @@ -270,8 +271,8 @@ public class ReblockBuffer long cbi = -1, cbj = -1; //current block indexes for( int i=0; i<_count; i++ ) { - long bi = getBlockIndex(_buff[i][0], _brlen); - long bj = getBlockIndex(_buff[i][1], _bclen); + long bi = UtilFunctions.computeBlockIndex(_buff[i][0], _brlen); + long bj = UtilFunctions.computeBlockIndex(_buff[i][1], _bclen); //switch to next block if( bi != cbi || bj != cbj ) { @@ -290,8 +291,8 @@ public class ReblockBuffer cbi = -1; cbj = -1; //current block indexes for( int i=0; i<_count; i++ ) { - long bi = getBlockIndex(_buff[i][0], _brlen); - long bj = getBlockIndex(_buff[i][1], _bclen); + long bi = UtilFunctions.computeBlockIndex(_buff[i][0], _brlen); + long bj = UtilFunctions.computeBlockIndex(_buff[i][1], _bclen); //output block and switch to next index pair if( bi != cbi || bj != cbj ) { @@ -303,8 +304,8 @@ public class ReblockBuffer Math.min(_bclen, (int)(_clen-(bj-1)*_bclen)), sparse); } - int ci = getIndexInBlock(_buff[i][0], _brlen); - int cj = getIndexInBlock(_buff[i][1], _bclen); + int ci = UtilFunctions.computeCellInBlock(_buff[i][0], _brlen); + int cj = UtilFunctions.computeCellInBlock(_buff[i][1], _bclen); double tmp = Double.longBitsToDouble(_buff[i][2]); tmpBlock.appendValue(ci, cj, tmp); } @@ -317,28 +318,6 @@ public class ReblockBuffer /** * - * @param ix - * @param blen - * @return - */ - private static long getBlockIndex( long ix, int blen ) - { - return (ix-1)/blen+1; - } - - /** - * - * @param ix - * @param blen - * @return - */ - private static int getIndexInBlock( long ix, int blen ) - { - return (int)((ix-1)%blen); - } - - /** - * * @param out * @param key * @param value @@ -397,10 +376,10 @@ public class ReblockBuffer @Override public int compare(long[] arg0, long[] arg1) { - long bi0 = getBlockIndex( arg0[0], _brlen ); - long bj0 = getBlockIndex( arg0[1], _bclen ); - long bi1 = getBlockIndex( arg1[0], _brlen ); - long bj1 = getBlockIndex( arg1[1], _bclen ); + long bi0 = UtilFunctions.computeBlockIndex( arg0[0], _brlen ); + long bj0 = UtilFunctions.computeBlockIndex( arg0[1], _bclen ); + long bi1 = UtilFunctions.computeBlockIndex( arg1[0], _brlen ); + long bj1 = UtilFunctions.computeBlockIndex( arg1[1], _bclen ); return ( bi0 < bi1 || (bi0 == bi1 && bj0 < bj1) ) ? -1 : (( bi0 == bi1 && bj0 == bj1)? 0 : 1); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/697ea5e4/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java index 05d752c..71350cb 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java @@ -20,7 +20,9 @@ package org.apache.sysml.test.integration.functions.frame; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.hadoop.io.LongWritable; @@ -40,10 +42,16 @@ import org.apache.sysml.runtime.io.FrameReader; import org.apache.sysml.runtime.io.FrameReaderFactory; import org.apache.sysml.runtime.io.FrameWriter; import org.apache.sysml.runtime.io.FrameWriterFactory; +import org.apache.sysml.runtime.io.MatrixReader; +import org.apache.sysml.runtime.io.MatrixReaderFactory; +import org.apache.sysml.runtime.io.MatrixWriter; +import org.apache.sysml.runtime.io.MatrixWriterFactory; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.InputInfo; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.util.MapReduceTool; import org.apache.sysml.runtime.util.UtilFunctions; @@ -53,6 +61,8 @@ import org.apache.sysml.test.utils.TestUtils; import org.junit.Assert; import org.junit.Test; + + public class FrameConverterTest extends AutomatedTestBase { private final static String TEST_DIR = "functions/frame/"; @@ -60,13 +70,32 @@ public class FrameConverterTest extends AutomatedTestBase private final static int rows = 1593; private final static ValueType[] schemaStrings = new ValueType[]{ValueType.STRING, ValueType.STRING, ValueType.STRING}; - private final static ValueType[] schemaMixed = new ValueType[]{ValueType.STRING, ValueType.DOUBLE, ValueType.INT, ValueType.BOOLEAN}; + private final static ValueType[] schemaMixed = new ValueType[]{ValueType.STRING, ValueType.DOUBLE, ValueType.INT, ValueType.BOOLEAN}; + + private final static List<ValueType> schemaMixedLargeListStr = Collections.nCopies(600, ValueType.STRING); + private final static List<ValueType> schemaMixedLargeListDble = Collections.nCopies(600, ValueType.DOUBLE); + private final static List<ValueType> schemaMixedLargeListInt = Collections.nCopies(600, ValueType.INT); + private final static List<ValueType> schemaMixedLargeListBool = Collections.nCopies(600, ValueType.BOOLEAN); + private static List<ValueType> schemaMixedLargeList = null; + static { + schemaMixedLargeList = new ArrayList<ValueType>(schemaMixedLargeListStr); + schemaMixedLargeList.addAll(schemaMixedLargeListDble); + schemaMixedLargeList.addAll(schemaMixedLargeListInt); + schemaMixedLargeList.addAll(schemaMixedLargeListBool); + } + + private static ValueType[] schemaMixedLarge = new ValueType[schemaMixedLargeList.size()]; + static { + schemaMixedLarge = (ValueType[]) schemaMixedLargeList.toArray(schemaMixedLarge); + } private enum ConvType { CSV2BIN, BIN2CSV, TXTCELL2BIN, - BIN2TXTCELL + BIN2TXTCELL, + MAT2BIN, + BIN2MAT, } @Override @@ -115,11 +144,41 @@ public class FrameConverterTest extends AutomatedTestBase runFrameConverterTest(schemaMixed, ConvType.BIN2TXTCELL); } + @Test + public void testFrameStringsMatrixBinSpark() { + runFrameConverterTest(schemaStrings, ConvType.MAT2BIN); + } + + @Test + public void testFrameMixedMatrixBinSpark() { + runFrameConverterTest(schemaMixed, ConvType.MAT2BIN); + } + + @Test + public void testFrameStringsBinMatrixSpark() { + runFrameConverterTest(schemaStrings, ConvType.BIN2MAT); + } + + @Test + public void testFrameMixedBinMatrixSpark() { + runFrameConverterTest(schemaMixed, ConvType.BIN2MAT); + } + + @Test + public void testFrameMixedMultiColBlkMatrixBinSpark() { + runFrameConverterTest(schemaMixedLarge, ConvType.MAT2BIN); + } + + @Test + public void testFrameMixedMultiColBlkBinMatrixSpark() { + runFrameConverterTest(schemaMixedLarge, ConvType.BIN2MAT); + } + /** * - * @param sparseM1 - * @param sparseM2 + * @param schema + * @param type * @param instType */ private void runFrameConverterTest( ValueType[] schema, ConvType type) @@ -160,10 +219,46 @@ public class FrameConverterTest extends AutomatedTestBase oinfo = OutputInfo.BinaryBlockOutputInfo; iinfo = InputInfo.TextCellInputInfo; break; + case MAT2BIN: + case BIN2MAT: + oinfo = OutputInfo.BinaryBlockOutputInfo; + iinfo = InputInfo.BinaryBlockInputInfo; + break; default: throw new RuntimeException("Unsuported converter type: "+type.toString()); } + + if(type == ConvType.MAT2BIN || type == ConvType.BIN2MAT) + runMatrixConverterAndVerify(schema, A, type, iinfo, oinfo); + else + runConverterAndVerify(schema, A, type, iinfo, oinfo); + + } + catch(Exception ex) { + ex.printStackTrace(); + throw new RuntimeException(ex); + } + finally + { + DMLScript.rtplatform = platformOld; + DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; + } + } + + /** + * + * @param schema + * @param A + * @param type + * @param iinfo + * @param oinfo + * @param instType + */ + private void runConverterAndVerify( ValueType[] schema, double[][] A, ConvType type, InputInfo iinfo, OutputInfo oinfo ) + { + try + { //initialize the frame data. List<ValueType> lschema = Arrays.asList(schema); FrameBlock frame1 = new FrameBlock(lschema); @@ -172,10 +267,10 @@ public class FrameConverterTest extends AutomatedTestBase //write frame data to hdfs FrameWriter writer = FrameWriterFactory.createFrameWriter(oinfo); writer.writeFrameToHDFS(frame1, input("A"), rows, schema.length); - + //run converter under test MatrixCharacteristics mc = new MatrixCharacteristics(rows, schema.length, -1, -1, -1); - runConverter(type, mc, Arrays.asList(schema), input("A"), output("B")); + runConverter(type, mc, null, Arrays.asList(schema), input("A"), output("B")); //read frame data from hdfs FrameReader reader = FrameReaderFactory.createFrameReader(iinfo); @@ -188,10 +283,73 @@ public class FrameConverterTest extends AutomatedTestBase ex.printStackTrace(); throw new RuntimeException(ex); } - finally + } + + /** + * + * @param schema + * @param A + * @param type + * @param iinfo + * @param oinfo + * @param instType + */ + private void runMatrixConverterAndVerify( ValueType[] schema, double[][] A, ConvType type, InputInfo iinfo, OutputInfo oinfo ) + { + try { - DMLScript.rtplatform = platformOld; - DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; + MatrixCharacteristics mcMatrix = new MatrixCharacteristics(rows, schema.length, 1000, 1000, 0); + MatrixCharacteristics mcFrame = new MatrixCharacteristics(rows, schema.length, -1, -1, -1); + + MatrixBlock matrixBlock1 = null; + FrameBlock frame1 = null; + + if(type == ConvType.MAT2BIN) { + //initialize the matrix (dense) data. + matrixBlock1 = new MatrixBlock(rows, schema.length, false); + matrixBlock1.init(A, rows, schema.length); + + //write matrix data to hdfs + MatrixWriter matWriter = MatrixWriterFactory.createMatrixWriter(oinfo); + matWriter.writeMatrixToHDFS(matrixBlock1, input("A"), rows, schema.length, + mcMatrix.getRowsPerBlock(), mcMatrix.getColsPerBlock(), mcMatrix.getNonZeros()); + } + else { + //initialize the frame data. + List<ValueType> lschema = Arrays.asList(schema); + frame1 = new FrameBlock(lschema); + initFrameData(frame1, A, lschema); + + //write frame data to hdfs + FrameWriter writer = FrameWriterFactory.createFrameWriter(oinfo); + writer.writeFrameToHDFS(frame1, input("A"), rows, schema.length); + } + + //run converter under test + runConverter(type, mcFrame, mcMatrix, Arrays.asList(schema), input("A"), output("B")); + + if(type == ConvType.MAT2BIN) { + //read frame data from hdfs + FrameReader reader = FrameReaderFactory.createFrameReader(iinfo); + FrameBlock frame2 = reader.readFrameFromHDFS(output("B"), rows, schema.length); + + //verify input and output frame/matrix + verifyFrameMatrixData(frame2, matrixBlock1); + } + else { + //read matrix data from hdfs + MatrixReader matReader = MatrixReaderFactory.createMatrixReader(iinfo); + MatrixBlock matrixBlock2 = matReader.readMatrixFromHDFS(output("B"), rows, schema.length, + mcMatrix.getRowsPerBlock(), mcMatrix.getColsPerBlock(), mcMatrix.getNonZeros()); + + //verify input and output frame/matrix + verifyFrameMatrixData(frame1, matrixBlock2); + } + + } + catch(Exception ex) { + ex.printStackTrace(); + throw new RuntimeException(ex); } } @@ -222,13 +380,30 @@ public class FrameConverterTest extends AutomatedTestBase String val1 = UtilFunctions.objectToString(frame1.get(i, j)); String val2 = UtilFunctions.objectToString(frame2.get(i, j)); if( UtilFunctions.compareTo(ValueType.STRING, val1, val2) != 0) - Assert.fail("Target value for cell ("+ i + "," + j + ") is " + val1 + - ", is not same as original value " + val2); + Assert.fail("The original data for cell ("+ i + "," + j + ") is " + val1 + + ", not same as the converted value " + val2); } } + + /** + * + * @param frame1 + * @param frame2 + */ + private void verifyFrameMatrixData(FrameBlock frame, MatrixBlock matrix) { + for ( int i=0; i<frame.getNumRows(); i++ ) + for( int j=0; j<frame.getNumColumns(); j++ ) { + Object val1 = UtilFunctions.doubleToObject(frame.getSchema().get(j), + UtilFunctions.objectToDouble(frame.getSchema().get(j), frame.get(i, j))); + Object val2 = UtilFunctions.doubleToObject(frame.getSchema().get(j), matrix.getValue(i, j)); + if(( UtilFunctions.compareTo(frame.getSchema().get(j), val1, val2)) != 0) + Assert.fail("Frame value for cell ("+ i + "," + j + ") is " + val1 + + ", is not same as matrix value " + val2); + } + } - + /** * @param oinfo * @param frame1 @@ -240,7 +415,8 @@ public class FrameConverterTest extends AutomatedTestBase */ @SuppressWarnings("unchecked") - private void runConverter(ConvType type, MatrixCharacteristics mc, List<ValueType> schema, String fnameIn, String fnameOut) + private void runConverter(ConvType type, MatrixCharacteristics mc, MatrixCharacteristics mcMatrix, + List<ValueType> schema, String fnameIn, String fnameOut) throws DMLRuntimeException, IOException { SparkExecutionContext sec = (SparkExecutionContext) ExecutionContextFactory.createContext(); @@ -282,6 +458,24 @@ public class FrameConverterTest extends AutomatedTestBase rddOut.saveAsTextFile(fnameOut); break; } + case MAT2BIN: { + InputInfo iinfo = InputInfo.BinaryBlockInputInfo; + OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo; + JavaPairRDD<MatrixIndexes,MatrixBlock> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, iinfo.inputKeyClass, iinfo.inputValueClass); + JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils.matrixBlockToBinaryBlock(sc, rddIn, mcMatrix); + rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass); + break; + } + case BIN2MAT: { + InputInfo iinfo = InputInfo.BinaryBlockInputInfo; + OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo; + JavaPairRDD<LongWritable, FrameBlock> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class); + JavaPairRDD<MatrixIndexes,MatrixBlock> rddOut = FrameRDDConverterUtils.binaryBlockToMatrixBlock(rddIn, mc, mcMatrix); + rddOut.saveAsHadoopFile(fnameOut, MatrixIndexes.class, MatrixBlock.class, oinfo.outputFormatClass); + break; + } + default: + throw new RuntimeException("Unsuported converter type: "+type.toString()); } sec.close();
