Repository: incubator-systemml
Updated Branches:
  refs/heads/master 6e13dd3b1 -> 697ea5e4f


[SYSTEMML-560] Frame converter between Matrix and Binary block


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/697ea5e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/697ea5e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/697ea5e4

Branch: refs/heads/master
Commit: 697ea5e4fbbb3b109a7089a12b365d23de3422a1
Parents: 6e13dd3
Author: Arvind Surve <[email protected]>
Authored: Mon May 9 23:58:30 2016 -0700
Committer: Arvind Surve <[email protected]>
Committed: Mon May 9 23:58:30 2016 -0700

----------------------------------------------------------------------
 .../spark/utils/FrameRDDConverterUtils.java     | 267 ++++++++++++++++++-
 .../sysml/runtime/matrix/data/FrameBlock.java   |   2 +-
 .../matrix/mapred/FrameReblockBuffer.java       |  42 +--
 .../runtime/matrix/mapred/ReblockBuffer.java    |  63 ++---
 .../functions/frame/FrameConverterTest.java     | 220 ++++++++++++++-
 5 files changed, 503 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/697ea5e4/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 02134c8..5f319b9 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -46,11 +46,16 @@ import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.matrix.mapred.FrameReblockBuffer;
+import org.apache.sysml.runtime.util.DataConverter;
 import org.apache.sysml.runtime.util.FastStringTokenizer;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
+
+
 public class FrameRDDConverterUtils 
 {
        //=====================================
@@ -212,7 +217,85 @@ public class FrameRDDConverterUtils
                }
        }
        
+       //=====================================
+       // Matrix block <--> Binary block
+
+       /**
+        * 
+        * @param sc
+        * @param input
+        * @param mcIn
+        * @return
+        * @throws DMLRuntimeException
+        */
+       public static JavaPairRDD<LongWritable, FrameBlock> 
matrixBlockToBinaryBlock(JavaSparkContext sc,
+                       JavaPairRDD<MatrixIndexes, MatrixBlock> input, 
MatrixCharacteristics mcIn)
+               throws DMLRuntimeException 
+       {
+               //Do actual conversion
+               JavaPairRDD<Long, FrameBlock> output = 
matrixBlockToBinaryBlockLongIndex(sc,input, mcIn);
+               
+               //convert input rdd to serializable LongWritable/frame block
+               JavaPairRDD<LongWritable,FrameBlock> out = 
+                               output.mapToPair(new 
LongFrameToLongWritableFrameFunction());
+               
+               return out;
+       }
+       
+
+       /**
+        * 
+        * @param sc
+        * @param input
+        * @param mcIn
+        * @return
+        * @throws DMLRuntimeException
+        */
+       public static JavaPairRDD<Long, FrameBlock> 
matrixBlockToBinaryBlockLongIndex(JavaSparkContext sc,
+                       JavaPairRDD<MatrixIndexes, MatrixBlock> input, 
MatrixCharacteristics mcIn)
+               throws DMLRuntimeException 
+       {
+               JavaPairRDD<Long, FrameBlock> out = null;
+               
+               if(mcIn.getCols() > mcIn.getColsPerBlock()) {
+                       
+                       out = input.flatMapToPair(new 
MatrixToBinaryBlockFunction(mcIn));
+                       
+                       //aggregate partial frame blocks
+                       if(mcIn.getCols() > mcIn.getColsPerBlock())
+                               out = RDDAggregateUtils.mergeByFrameKey( out ); 
        //TODO: Will need better merger
+               }
+               else
+                       out = input.mapToPair(new 
MatrixToBinaryBlockOneColumnBlockFunction(mcIn));
+               
+               return out;
+       }
+       
 
+       
+       /**
+        * 
+        * @param in
+        * @param mcIn
+        * @param props
+        * @param strict
+        * @return
+        */
+       public static JavaPairRDD<MatrixIndexes, MatrixBlock> 
binaryBlockToMatrixBlock(JavaPairRDD<LongWritable,FrameBlock> input, 
+                       MatrixCharacteristics mcIn, MatrixCharacteristics 
mcOut) 
+       {
+               //convert binary block to matrix block
+               JavaPairRDD<MatrixIndexes, MatrixBlock> out = input
+                               .flatMapToPair(new 
BinaryBlockToMatrixBlockFunction(mcIn, mcOut));
+       
+               //aggregate partial matrix blocks
+               out = RDDAggregateUtils.mergeByKey( out );      
+               
+               return out;
+       }
+       
+
+       
        /////////////////////////////////
        // CSV-SPECIFIC FUNCTIONS
        
@@ -303,7 +386,7 @@ public class FrameRDDConverterUtils
                private boolean _fill = false;
                private int _maxRowsPerBlock = -1; 
                
-               protected static final int BUFFER_SIZE = 2 * 1000 * 1000; //2M 
elements 
+               protected static final int BUFFER_SIZE = 1 * 1000 * 1000; //1M 
elements, size of default matrix block 
 
                
                public CSVToBinaryBlockFunction(MatrixCharacteristics mc, 
boolean hasHeader, String delim, boolean fill)
@@ -437,7 +520,7 @@ public class FrameRDDConverterUtils
                private static final long serialVersionUID = 
-729614449626680946L;
 
                //internal buffer size (aligned w/ default matrix block size)
-               protected static final int BUFFER_SIZE = 4 * 1000 * 1000; //4M 
elements (32MB)
+               protected static final int BUFFER_SIZE = 1 * 1000 * 1000; //1M 
elements (8MB), size of default matrix block
                protected int _bufflen = -1;
                
                protected long _rlen = -1;
@@ -518,5 +601,183 @@ public class FrameRDDConverterUtils
                
                        return ret;
                }
-       }       
+       }
+       
+       // MATRIX Block <---> Binary Block specific functions
+       private static class MatrixToBinaryBlockFunction implements 
PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>,Long,FrameBlock>
+       {
+               private static final long serialVersionUID = 
6205071301074768437L;
+
+               private int _brlen = -1;
+               private int _bclen = -1;
+               private long _clen = -1;
+               private int _maxRowsPerBlock = -1;
+       
+               
+               protected static final int BUFFER_SIZE = 1 * 1000 * 1000; //1M 
elements (Default matrix block size) 
+
+               
+               public MatrixToBinaryBlockFunction(MatrixCharacteristics mc)
+               {
+                       _brlen = mc.getRowsPerBlock();
+                       _bclen = mc.getColsPerBlock();
+                       _clen = mc.getCols();
+                       _maxRowsPerBlock = Math.max((int) (BUFFER_SIZE/_clen), 
1);
+               }
+
+               @Override
+               public Iterable<Tuple2<Long, FrameBlock>> 
call(Tuple2<MatrixIndexes,MatrixBlock> arg0) 
+                       throws Exception 
+               {
+                       ArrayList<Tuple2<Long,FrameBlock>> ret = new 
ArrayList<Tuple2<Long,FrameBlock>>();
+
+                       MatrixIndexes matrixIndexes = arg0._1();
+                       MatrixBlock matrixBlock = arg0._2();
+                       
+                       //Frame Index (Row id, with base 1)
+                       Long rowix = new 
Long((matrixIndexes.getRowIndex()-1)*_brlen+1);
+
+                       //Global index within frame blocks
+                       long colixLow = 
(int)((matrixIndexes.getColumnIndex()-1)*_bclen+1);
+                       long colixHigh = 
Math.min(colixLow+matrixBlock.getMaxColumn()-1, _clen);
+                       
+                       //Index within a local matrix block
+                       int iColLowMat = 
UtilFunctions.computeCellInBlock(colixLow, _bclen);
+                       int iColHighMat = 
UtilFunctions.computeCellInBlock(colixHigh, _bclen);
+
+                       FrameBlock tmpBlock = 
DataConverter.convertToFrameBlock(matrixBlock);
+
+                       int iRowLow = 0;        //Index within a local frame 
block
+                       while(iRowLow < matrixBlock.getMaxRow()) {
+                               int iRowHigh = 
Math.min(iRowLow+_maxRowsPerBlock-1,  matrixBlock.getMaxRow()-1);
+                               
+                               FrameBlock tmpBlock2 = null;
+                               //All rows from matrix block can fit into 
single frame block, no need for slicing 
+                               if(iRowLow == 0 && iRowHigh == 
matrixBlock.getMaxRow()-1)
+                                       tmpBlock2 = tmpBlock;
+                               else
+                                       tmpBlock2 = 
tmpBlock.sliceOperations(iRowLow, iRowHigh, iColLowMat, iColHighMat, tmpBlock2);
+                               
+                               //If Matrix has only one column block, then 
simply assigns converted block to frame block
+                               if(colixLow == 0 && colixHigh == 
matrixBlock.getMaxColumn()-1)
+                                       ret.add(new Tuple2<Long, 
FrameBlock>(rowix+iRowLow, tmpBlock2));
+                               else
+                               {
+                                       FrameBlock frameBlock = new 
FrameBlock((int)_clen, ValueType.STRING);
+                                       
frameBlock.ensureAllocatedColumns(iRowHigh-iRowLow+1);
+                                       
+                                       frameBlock.copy(0, iRowHigh-iRowLow, 
(int)colixLow-1, (int)colixHigh-1, tmpBlock2);
+                                       ret.add(new Tuple2<Long, 
FrameBlock>(rowix+iRowLow, frameBlock));
+                               }
+                               iRowLow = iRowHigh+1;
+                       }
+                       return ret;
+               }
+       }
+
+       /*
+        * This function supports if matrix has only one column block.
+        */
+       private static class MatrixToBinaryBlockOneColumnBlockFunction 
implements PairFunction<Tuple2<MatrixIndexes,MatrixBlock>,Long,FrameBlock>
+       {
+               private static final long serialVersionUID = 
3716019666116660815L;
+
+               private int _brlen = -1;
+               private int _bclen = -1;
+               private long _clen = -1;
+       
+               
+               public 
MatrixToBinaryBlockOneColumnBlockFunction(MatrixCharacteristics mc)
+               {
+                       _brlen = mc.getRowsPerBlock();
+                       _bclen = mc.getColsPerBlock();
+                       _clen = mc.getCols();
+               }
+
+               @Override
+               public Tuple2<Long, FrameBlock> 
call(Tuple2<MatrixIndexes,MatrixBlock> arg0) 
+                       throws Exception 
+               {
+                       if(_clen > _bclen)
+                               throw new DMLRuntimeException("The input matrix 
has more than one column block, this function supports only one column block.");
+
+                       MatrixIndexes matrixIndexes = arg0._1();
+                       MatrixBlock matrixBlock = arg0._2();
+                       
+                       //Frame Index (Row id, with base 1)
+                       Long rowix = new 
Long((matrixIndexes.getRowIndex()-1)*_brlen+1);
+
+                       FrameBlock frameBlock = 
DataConverter.convertToFrameBlock(matrixBlock);
+                       return new Tuple2<Long, FrameBlock>(rowix, frameBlock);
+               }
+       }
+
+       
+       /**
+        * 
+        */
+       private static class BinaryBlockToMatrixBlockFunction implements 
PairFlatMapFunction<Tuple2<LongWritable,FrameBlock>,MatrixIndexes, MatrixBlock> 
+       {
+               private static final long serialVersionUID = 
-2654986510471835933L;
+               
+               MatrixCharacteristics _mcIn, _mcOut;
+
+               public BinaryBlockToMatrixBlockFunction(MatrixCharacteristics 
mcIn,
+                               MatrixCharacteristics mcOut) {
+                               
+                               _mcIn = mcIn;           //Frame Characteristics
+                               _mcOut = mcOut;         //Matrix Characteristics
+               }
+
+               @Override
+               public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> 
call(Tuple2<LongWritable, FrameBlock> arg0)
+                       throws Exception 
+               {
+                       long rowIndex = arg0._1().get();
+                       FrameBlock blk = arg0._2();
+                       
+                       ArrayList<Tuple2<MatrixIndexes, MatrixBlock>> ret = new 
ArrayList<Tuple2<MatrixIndexes, MatrixBlock>>();
+                       
+                       int _brlenMatrix = _mcOut.getRowsPerBlock();
+                       int _bclenMatrix = _mcOut.getColsPerBlock();
+                       long _rlen = _mcIn.getRows();
+                       long _clen = _mcIn.getCols();
+                       
+                       long lRowId = 0;
+                       while (lRowId < blk.getNumRows()) {
+                               // Global Row indices (indexes) across all 
frame blocks  
+                               long endRow = 
((rowIndex+lRowId-1)/_brlenMatrix+1) * _brlenMatrix;
+                               long begRow = Math.max(endRow-_brlenMatrix+1, 
0);
+                               endRow = Math.min(endRow, _rlen);
+                               
+                               // Local Row indices (indexes) within a matrix 
block  
+                               long begRowMat = 
UtilFunctions.computeCellInBlock(begRow, _brlenMatrix);
+                               long endRowMat = 
UtilFunctions.computeCellInBlock(endRow, _brlenMatrix);
+                               
+                               long lColId = 0;
+                               while (lColId < blk.getNumColumns()) {
+                                       // Global Column index across all frame 
blocks  
+                                       long endCol = 
Math.min(lColId+_bclenMatrix-1, _clen-1);
+
+                                       // Local Column indices (indexes) 
within a matrix block  
+                                       long begColMat = 
UtilFunctions.computeCellInBlock(lColId+1, _bclenMatrix);
+                                       long endColMat = 
UtilFunctions.computeCellInBlock(endCol+1, _bclenMatrix);
+
+                                       FrameBlock tmpFrame = new FrameBlock();
+                                       tmpFrame = 
blk.sliceOperations((int)lRowId, (int)(lRowId+endRowMat-begRowMat), 
(int)lColId, (int)endCol, tmpFrame);
+
+                                       MatrixIndexes matrixIndexes = new 
MatrixIndexes(UtilFunctions.computeBlockIndex(begRow+1, 
_brlenMatrix),UtilFunctions.computeBlockIndex(lColId+1, _bclenMatrix));
+
+                                       MatrixBlock matrixBlocktmp = 
DataConverter.convertToMatrixBlock(tmpFrame);
+                                       MatrixBlock matrixBlock = 
matrixBlocktmp.leftIndexingOperations(matrixBlocktmp, (int)begRowMat, 
(int)endRowMat, (int)begColMat, (int)endColMat, new MatrixBlock(), true);
+                                       ret.add(new Tuple2<MatrixIndexes, 
MatrixBlock>(matrixIndexes, matrixBlock));
+                                       
+                                       lColId = endCol+1;
+                               }
+                               lRowId += (endRow-begRow+1);
+                       }
+                       
+                       return ret;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/697ea5e4/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 6323fea..d9cf8c3 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -636,7 +636,7 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                        //general case w/ schema transformation
                        else 
                                for( int i=rl; i<=ru; i++ ) {
-                                       String tmp = src.get(i-rl, j)!=null ? 
src.get(i-rl, j).toString() : null;
+                                       String tmp = src.get(i-rl, j-cl)!=null 
? src.get(i-rl, j-cl).toString() : null;
                                        set(i, j, 
UtilFunctions.stringToObject(_schema.get(j), tmp));
                                }
                }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/697ea5e4/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java
index 9b2cd37..2762a0d 100644
--- 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java
@@ -32,6 +32,7 @@ import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.Pair;
+import org.apache.sysml.runtime.util.UtilFunctions;
 
 /**
  * 
@@ -88,34 +89,11 @@ public class FrameReblockBuffer
                _rlen = rlen;
                _clen = clen;
                _brlen = Math.max((int)(_bufflen/_clen), 1);
-               _bclen = 1;
+               _bclen = (int)clen;
                
                _schema = schema;
        }
        
-
-       /**
-        * 
-        * @param ix
-        * @param blen
-        * @return
-        */
-       private long getBlockIndex( long ix, int blen )
-       {
-               return (ix-1)/_brlen+1;
-       }
-       
-       /**
-        * 
-        * @param ix
-        * @param blen
-        * @return
-        */
-       private int getIndexInBlock( long ix, int blen )
-       {
-               return (int)((ix-1)%_brlen);
-       }
-       
        public int getSize()
        {
                return _count;
@@ -194,8 +172,8 @@ public class FrameReblockBuffer
                long cbi = -1, cbj = -1; //current block indexes
                for( int i=0; i<_count; i++ )
                {
-                       long bi = getBlockIndex(_buff[i].getRow(), _brlen);
-                       long bj = getBlockIndex(_buff[i].getCol(), _bclen);
+                       long bi = 
UtilFunctions.computeBlockIndex(_buff[i].getRow(), _brlen);
+                       long bj = 
UtilFunctions.computeBlockIndex(_buff[i].getCol(), _bclen);
                        
                        //output block and switch to next index pair
                        if( bi != cbi || bj != cbj ) {
@@ -206,8 +184,8 @@ public class FrameReblockBuffer
                                tmpBlock.reset(Math.min(_brlen, 
(int)(_rlen-(bi-1)*_brlen)));
                        }
                        
-                       int ci = getIndexInBlock(_buff[i].getRow(), _brlen);
-                       int cj = getIndexInBlock(_buff[i].getCol(), _bclen);
+                       int ci = 
UtilFunctions.computeCellInBlock(_buff[i].getRow(), _brlen);
+                       int cj = 
UtilFunctions.computeCellInBlock(_buff[i].getCol(), _bclen);
                        tmpBlock.set(ci, cj, _buff[i].getObjVal());
                }
                
@@ -240,8 +218,8 @@ public class FrameReblockBuffer
                long cbi = -1, cbj = -1; //current block indexes
                for( int i=0; i<_count; i++ )
                {
-                       long bi = getBlockIndex(_buff[i].getRow(), _brlen);
-                       long bj = getBlockIndex(_buff[i].getCol(), _bclen);
+                       long bi = 
UtilFunctions.computeBlockIndex(_buff[i].getRow(), _brlen);
+                       long bj = 
UtilFunctions.computeBlockIndex(_buff[i].getCol(), _bclen);
                        
                        //output block and switch to next index pair
                        if( bi != cbi || bj != cbj ) {
@@ -255,8 +233,8 @@ public class FrameReblockBuffer
                                
                        }
                        
-                       int ci = getIndexInBlock(_buff[i].getRow(), _brlen);
-                       int cj = getIndexInBlock(_buff[i].getCol(), _bclen);
+                       int ci = 
UtilFunctions.computeCellInBlock(_buff[i].getRow(), _brlen);
+                       int cj = 
UtilFunctions.computeCellInBlock(_buff[i].getCol(), _bclen);
                        tmpBlock.set(ci, cj, _buff[i].getObjVal());
                }
                

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/697ea5e4/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java
index 3b7dd4a..98416ea 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/ReblockBuffer.java
@@ -35,6 +35,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.PartialBlock;
 import org.apache.sysml.runtime.matrix.data.TaggedAdaptivePartialBlock;
+import org.apache.sysml.runtime.util.UtilFunctions;
 
 /**
  * 
@@ -175,8 +176,8 @@ public class ReblockBuffer
                long cbi = -1, cbj = -1; //current block indexes
                for( int i=0; i<_count; i++ )
                {
-                       long bi = getBlockIndex(_buff[i][0], _brlen);
-                       long bj = getBlockIndex(_buff[i][1], _bclen);
+                       long bi = UtilFunctions.computeBlockIndex(_buff[i][0], 
_brlen);
+                       long bj = UtilFunctions.computeBlockIndex(_buff[i][1], 
_bclen);
                        
                        //switch to next block
                        if( bi != cbi || bj != cbj ) {
@@ -208,8 +209,8 @@ public class ReblockBuffer
                        cbi = -1; cbj = -1; //current block indexes
                        for( int i=0; i<_count; i++ )
                        {
-                               long bi = getBlockIndex(_buff[i][0], _brlen);
-                               long bj = getBlockIndex(_buff[i][1], _bclen);
+                               long bi = 
UtilFunctions.computeBlockIndex(_buff[i][0], _brlen);
+                               long bj = 
UtilFunctions.computeBlockIndex(_buff[i][1], _bclen);
                                
                                //output block and switch to next index pair
                                if( bi != cbi || bj != cbj ) {
@@ -221,8 +222,8 @@ public class ReblockBuffer
                                                               Math.min(_bclen, 
(int)(_clen-(bj-1)*_bclen)), sparse);
                                }
                                
-                               int ci = getIndexInBlock(_buff[i][0], _brlen);
-                               int cj = getIndexInBlock(_buff[i][1], _bclen);
+                               int ci = 
UtilFunctions.computeCellInBlock(_buff[i][0], _brlen);
+                               int cj = 
UtilFunctions.computeCellInBlock(_buff[i][1], _bclen);
                                double tmp = 
Double.longBitsToDouble(_buff[i][2]);
                                tmpBlock.appendValue(ci, cj, tmp); 
                        }
@@ -236,10 +237,10 @@ public class ReblockBuffer
                        outVal.set(tmpVal);
                        for( int i=0; i<_count; i++ )
                        {
-                               long bi = getBlockIndex(_buff[i][0], _brlen);
-                               long bj = getBlockIndex(_buff[i][1], _bclen);
-                               int ci = getIndexInBlock(_buff[i][0], _brlen);
-                               int cj = getIndexInBlock(_buff[i][1], _bclen);
+                               long bi = 
UtilFunctions.computeBlockIndex(_buff[i][0], _brlen);
+                               long bj = 
UtilFunctions.computeBlockIndex(_buff[i][1], _bclen);
+                               int ci = 
UtilFunctions.computeCellInBlock(_buff[i][0], _brlen);
+                               int cj = 
UtilFunctions.computeCellInBlock(_buff[i][1], _bclen);
                                double tmp = 
Double.longBitsToDouble(_buff[i][2]);
                                tmpIx.setIndexes(bi, bj);
                                tmpVal.set(ci, cj, tmp); //in outVal, in outTVal
@@ -270,8 +271,8 @@ public class ReblockBuffer
                long cbi = -1, cbj = -1; //current block indexes
                for( int i=0; i<_count; i++ )
                {
-                       long bi = getBlockIndex(_buff[i][0], _brlen);
-                       long bj = getBlockIndex(_buff[i][1], _bclen);
+                       long bi = UtilFunctions.computeBlockIndex(_buff[i][0], 
_brlen);
+                       long bj = UtilFunctions.computeBlockIndex(_buff[i][1], 
_bclen);
                        
                        //switch to next block
                        if( bi != cbi || bj != cbj ) {
@@ -290,8 +291,8 @@ public class ReblockBuffer
                cbi = -1; cbj = -1; //current block indexes
                for( int i=0; i<_count; i++ )
                {
-                       long bi = getBlockIndex(_buff[i][0], _brlen);
-                       long bj = getBlockIndex(_buff[i][1], _bclen);
+                       long bi = UtilFunctions.computeBlockIndex(_buff[i][0], 
_brlen);
+                       long bj = UtilFunctions.computeBlockIndex(_buff[i][1], 
_bclen);
                        
                        //output block and switch to next index pair
                        if( bi != cbi || bj != cbj ) {
@@ -303,8 +304,8 @@ public class ReblockBuffer
                                                       Math.min(_bclen, 
(int)(_clen-(bj-1)*_bclen)), sparse);
                        }
                        
-                       int ci = getIndexInBlock(_buff[i][0], _brlen);
-                       int cj = getIndexInBlock(_buff[i][1], _bclen);
+                       int ci = UtilFunctions.computeCellInBlock(_buff[i][0], 
_brlen);
+                       int cj = UtilFunctions.computeCellInBlock(_buff[i][1], 
_bclen);
                        double tmp = Double.longBitsToDouble(_buff[i][2]);
                        tmpBlock.appendValue(ci, cj, tmp); 
                }
@@ -317,28 +318,6 @@ public class ReblockBuffer
        
        /**
         * 
-        * @param ix
-        * @param blen
-        * @return
-        */
-       private static long getBlockIndex( long ix, int blen )
-       {
-               return (ix-1)/blen+1;
-       }
-       
-       /**
-        * 
-        * @param ix
-        * @param blen
-        * @return
-        */
-       private static int getIndexInBlock( long ix, int blen )
-       {
-               return (int)((ix-1)%blen);
-       }
-       
-       /**
-        * 
         * @param out
         * @param key
         * @param value
@@ -397,10 +376,10 @@ public class ReblockBuffer
                @Override
                public int compare(long[] arg0, long[] arg1) 
                {
-                       long bi0 = getBlockIndex( arg0[0], _brlen );
-                       long bj0 = getBlockIndex( arg0[1], _bclen );
-                       long bi1 = getBlockIndex( arg1[0], _brlen );
-                       long bj1 = getBlockIndex( arg1[1], _bclen );
+                       long bi0 = UtilFunctions.computeBlockIndex( arg0[0], 
_brlen );
+                       long bj0 = UtilFunctions.computeBlockIndex( arg0[1], 
_bclen );
+                       long bi1 = UtilFunctions.computeBlockIndex( arg1[0], 
_brlen );
+                       long bj1 = UtilFunctions.computeBlockIndex( arg1[1], 
_bclen );
                        
                        return ( bi0 < bi1 || (bi0 == bi1 && bj0 < bj1) ) ? -1 :
                    (( bi0 == bi1 && bj0 == bj1)? 0 : 1);               

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/697ea5e4/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
index 05d752c..71350cb 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
@@ -20,7 +20,9 @@
 package org.apache.sysml.test.integration.functions.frame;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.io.LongWritable;
@@ -40,10 +42,16 @@ import org.apache.sysml.runtime.io.FrameReader;
 import org.apache.sysml.runtime.io.FrameReaderFactory;
 import org.apache.sysml.runtime.io.FrameWriter;
 import org.apache.sysml.runtime.io.FrameWriterFactory;
+import org.apache.sysml.runtime.io.MatrixReader;
+import org.apache.sysml.runtime.io.MatrixReaderFactory;
+import org.apache.sysml.runtime.io.MatrixWriter;
+import org.apache.sysml.runtime.io.MatrixWriterFactory;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.util.MapReduceTool;
 import org.apache.sysml.runtime.util.UtilFunctions;
@@ -53,6 +61,8 @@ import org.apache.sysml.test.utils.TestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+
+
 public class FrameConverterTest extends AutomatedTestBase
 {
        private final static String TEST_DIR = "functions/frame/";
@@ -60,13 +70,32 @@ public class FrameConverterTest extends AutomatedTestBase
 
        private final static int rows = 1593;
        private final static ValueType[] schemaStrings = new 
ValueType[]{ValueType.STRING, ValueType.STRING, ValueType.STRING}; 
-       private final static ValueType[] schemaMixed = new 
ValueType[]{ValueType.STRING, ValueType.DOUBLE, ValueType.INT, 
ValueType.BOOLEAN};   
+       private final static ValueType[] schemaMixed = new 
ValueType[]{ValueType.STRING, ValueType.DOUBLE, ValueType.INT, 
ValueType.BOOLEAN};
+
+       private final static List<ValueType> schemaMixedLargeListStr = 
Collections.nCopies(600, ValueType.STRING);
+       private final static List<ValueType> schemaMixedLargeListDble  = 
Collections.nCopies(600, ValueType.DOUBLE);
+       private final static List<ValueType> schemaMixedLargeListInt  = 
Collections.nCopies(600, ValueType.INT);
+       private final static List<ValueType> schemaMixedLargeListBool  = 
Collections.nCopies(600, ValueType.BOOLEAN);
+       private static List<ValueType> schemaMixedLargeList = null;
+       static {
+               schemaMixedLargeList = new 
ArrayList<ValueType>(schemaMixedLargeListStr);
+               schemaMixedLargeList.addAll(schemaMixedLargeListDble);
+               schemaMixedLargeList.addAll(schemaMixedLargeListInt);
+               schemaMixedLargeList.addAll(schemaMixedLargeListBool);
+       }
+
+       private static ValueType[] schemaMixedLarge = new 
ValueType[schemaMixedLargeList.size()];
+       static {
+               schemaMixedLarge = (ValueType[]) 
schemaMixedLargeList.toArray(schemaMixedLarge);
+       }
        
        private enum ConvType {
                CSV2BIN,
                BIN2CSV,
                TXTCELL2BIN,
-               BIN2TXTCELL
+               BIN2TXTCELL,
+               MAT2BIN,
+               BIN2MAT,
        }
        
        @Override
@@ -115,11 +144,41 @@ public class FrameConverterTest extends AutomatedTestBase
                runFrameConverterTest(schemaMixed, ConvType.BIN2TXTCELL);
        }
 
+       @Test
+       public void testFrameStringsMatrixBinSpark()  {
+               runFrameConverterTest(schemaStrings, ConvType.MAT2BIN);
+       }
+       
+       @Test
+       public void testFrameMixedMatrixBinSpark()  {
+               runFrameConverterTest(schemaMixed, ConvType.MAT2BIN);
+       }
+       
+       @Test
+       public void testFrameStringsBinMatrixSpark()  {
+               runFrameConverterTest(schemaStrings, ConvType.BIN2MAT);
+       }
+       
+       @Test
+       public void testFrameMixedBinMatrixSpark()  {
+               runFrameConverterTest(schemaMixed, ConvType.BIN2MAT);
+       }
+       
+       @Test
+       public void testFrameMixedMultiColBlkMatrixBinSpark()  {
+               runFrameConverterTest(schemaMixedLarge, ConvType.MAT2BIN);
+       }
+       
+       @Test
+       public void testFrameMixedMultiColBlkBinMatrixSpark()  {
+               runFrameConverterTest(schemaMixedLarge, ConvType.BIN2MAT);
+       }
+       
        
        /**
         * 
-        * @param sparseM1
-        * @param sparseM2
+        * @param schema
+        * @param type
         * @param instType
         */
        private void runFrameConverterTest( ValueType[] schema, ConvType type)
@@ -160,10 +219,46 @@ public class FrameConverterTest extends AutomatedTestBase
                                        oinfo = 
OutputInfo.BinaryBlockOutputInfo;
                                        iinfo = InputInfo.TextCellInputInfo;
                                        break;
+                               case MAT2BIN: 
+                               case BIN2MAT:
+                                       oinfo = 
OutputInfo.BinaryBlockOutputInfo;
+                                       iinfo = InputInfo.BinaryBlockInputInfo;
+                                       break;
                                default: 
                                        throw new RuntimeException("Unsuported 
converter type: "+type.toString());
                        }
                        
+                       
+                       if(type == ConvType.MAT2BIN || type == ConvType.BIN2MAT)
+                               runMatrixConverterAndVerify(schema, A, type, 
iinfo, oinfo);
+                       else
+                               runConverterAndVerify(schema, A, type, iinfo, 
oinfo);
+
+               }
+               catch(Exception ex) {
+                       ex.printStackTrace();
+                       throw new RuntimeException(ex);
+               }
+               finally
+               {
+                       DMLScript.rtplatform = platformOld;
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+               }
+       }
+       
+       /**
+        * 
+        * @param schema
+        * @param A
+        * @param type
+        * @param iinfo
+        * @param oinfo
+        * @param instType
+        */
+       private void runConverterAndVerify( ValueType[] schema, double[][] A, 
ConvType type, InputInfo iinfo, OutputInfo oinfo )
+       {
+               try
+               {
                        //initialize the frame data.
                        List<ValueType> lschema = Arrays.asList(schema);
                        FrameBlock frame1 = new FrameBlock(lschema);
@@ -172,10 +267,10 @@ public class FrameConverterTest extends AutomatedTestBase
                        //write frame data to hdfs
                        FrameWriter writer = 
FrameWriterFactory.createFrameWriter(oinfo);
                        writer.writeFrameToHDFS(frame1, input("A"), rows, 
schema.length);
-
+       
                        //run converter under test
                        MatrixCharacteristics mc = new 
MatrixCharacteristics(rows, schema.length, -1, -1, -1);
-                       runConverter(type, mc, Arrays.asList(schema), 
input("A"), output("B"));
+                       runConverter(type, mc, null, Arrays.asList(schema), 
input("A"), output("B"));
                        
                        //read frame data from hdfs
                        FrameReader reader = 
FrameReaderFactory.createFrameReader(iinfo);
@@ -188,10 +283,73 @@ public class FrameConverterTest extends AutomatedTestBase
                        ex.printStackTrace();
                        throw new RuntimeException(ex);
                }
-               finally
+       }
+       
+       /**
+        * 
+        * @param schema
+        * @param A
+        * @param type
+        * @param iinfo
+        * @param oinfo
+        * @param instType
+        */
+       private void runMatrixConverterAndVerify( ValueType[] schema, 
double[][] A, ConvType type, InputInfo iinfo, OutputInfo oinfo )
+       {
+               try
                {
-                       DMLScript.rtplatform = platformOld;
-                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+                       MatrixCharacteristics mcMatrix = new 
MatrixCharacteristics(rows, schema.length, 1000, 1000, 0);
+                       MatrixCharacteristics mcFrame = new 
MatrixCharacteristics(rows, schema.length, -1, -1, -1);
+                       
+                       MatrixBlock matrixBlock1 = null;
+                       FrameBlock frame1 = null;
+                       
+                       if(type == ConvType.MAT2BIN) {
+                               //initialize the matrix (dense) data.
+                               matrixBlock1 = new MatrixBlock(rows, 
schema.length, false);
+                               matrixBlock1.init(A, rows, schema.length);
+                               
+                               //write matrix data to hdfs
+                               MatrixWriter matWriter = 
MatrixWriterFactory.createMatrixWriter(oinfo);
+                               matWriter.writeMatrixToHDFS(matrixBlock1, 
input("A"), rows, schema.length, 
+                                               mcMatrix.getRowsPerBlock(), 
mcMatrix.getColsPerBlock(), mcMatrix.getNonZeros());
+                       } 
+                       else {
+                               //initialize the frame data.
+                               List<ValueType> lschema = Arrays.asList(schema);
+                               frame1 = new FrameBlock(lschema);
+                               initFrameData(frame1, A, lschema);
+
+                               //write frame data to hdfs
+                               FrameWriter writer = 
FrameWriterFactory.createFrameWriter(oinfo);
+                               writer.writeFrameToHDFS(frame1, input("A"), 
rows, schema.length);
+                       }
+       
+                       //run converter under test
+                       runConverter(type, mcFrame, mcMatrix, 
Arrays.asList(schema), input("A"), output("B"));
+                       
+                       if(type == ConvType.MAT2BIN) {
+                               //read frame data from hdfs
+                               FrameReader reader = 
FrameReaderFactory.createFrameReader(iinfo);
+                               FrameBlock frame2 = 
reader.readFrameFromHDFS(output("B"), rows, schema.length);
+
+                               //verify input and output frame/matrix
+                               verifyFrameMatrixData(frame2, matrixBlock1);
+                       } 
+                       else { 
+                               //read matrix data from hdfs
+                               MatrixReader matReader = 
MatrixReaderFactory.createMatrixReader(iinfo);
+                               MatrixBlock matrixBlock2 = 
matReader.readMatrixFromHDFS(output("B"), rows, schema.length, 
+                                               mcMatrix.getRowsPerBlock(), 
mcMatrix.getColsPerBlock(), mcMatrix.getNonZeros());
+
+                               //verify input and output frame/matrix
+                               verifyFrameMatrixData(frame1, matrixBlock2);
+                       }
+                       
+               }
+               catch(Exception ex) {
+                       ex.printStackTrace();
+                       throw new RuntimeException(ex);
                }
        }
        
@@ -222,13 +380,30 @@ public class FrameConverterTest extends AutomatedTestBase
                                String val1 = 
UtilFunctions.objectToString(frame1.get(i, j));
                                String val2 = 
UtilFunctions.objectToString(frame2.get(i, j));                           
                                if( UtilFunctions.compareTo(ValueType.STRING, 
val1, val2) != 0)
-                                       Assert.fail("Target value for cell ("+ 
i + "," + j + ") is " + val1 + 
-                                                       ", is not same as 
original value " + val2);
+                                       Assert.fail("The original data for cell 
("+ i + "," + j + ") is " + val1 + 
+                                                       ", not same as the 
converted value " + val2);
                        }
        }
 
+
+       /**
+        * 
+        * @param frame1
+        * @param frame2
+        */
+       private void verifyFrameMatrixData(FrameBlock frame, MatrixBlock 
matrix) {
+               for ( int i=0; i<frame.getNumRows(); i++ )
+                       for( int j=0; j<frame.getNumColumns(); j++ )    {
+                               Object val1 = 
UtilFunctions.doubleToObject(frame.getSchema().get(j),
+                                                               
UtilFunctions.objectToDouble(frame.getSchema().get(j), frame.get(i, j)));
+                               Object val2 = 
UtilFunctions.doubleToObject(frame.getSchema().get(j), matrix.getValue(i, j));
+                               if(( 
UtilFunctions.compareTo(frame.getSchema().get(j), val1, val2)) != 0)
+                                       Assert.fail("Frame value for cell ("+ i 
+ "," + j + ") is " + val1 + 
+                                                       ", is not same as 
matrix value " + val2);
+                       }
+       }
        
-       
+
        /**
         * @param oinfo 
         * @param frame1
@@ -240,7 +415,8 @@ public class FrameConverterTest extends AutomatedTestBase
         */
 
        @SuppressWarnings("unchecked")
-       private void runConverter(ConvType type, MatrixCharacteristics mc, 
List<ValueType> schema, String fnameIn, String fnameOut)
+       private void runConverter(ConvType type, MatrixCharacteristics mc, 
MatrixCharacteristics mcMatrix, 
+                       List<ValueType> schema, String fnameIn, String fnameOut)
                throws DMLRuntimeException, IOException
        {
                SparkExecutionContext sec = (SparkExecutionContext) 
ExecutionContextFactory.createContext();            
@@ -282,6 +458,24 @@ public class FrameConverterTest extends AutomatedTestBase
                                rddOut.saveAsTextFile(fnameOut);
                                break;
                        }
+                       case MAT2BIN: {
+                               InputInfo iinfo = 
InputInfo.BinaryBlockInputInfo;
+                               OutputInfo oinfo = 
OutputInfo.BinaryBlockOutputInfo;
+                               JavaPairRDD<MatrixIndexes,MatrixBlock> rddIn = 
sc.hadoopFile(fnameIn, iinfo.inputFormatClass, iinfo.inputKeyClass, 
iinfo.inputValueClass);
+                               JavaPairRDD<LongWritable, FrameBlock> rddOut = 
FrameRDDConverterUtils.matrixBlockToBinaryBlock(sc, rddIn, mcMatrix);
+                               rddOut.saveAsHadoopFile(fnameOut, 
LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
+                               break;
+                       }
+                       case BIN2MAT: {
+                               InputInfo iinfo = 
InputInfo.BinaryBlockInputInfo;
+                               OutputInfo oinfo = 
OutputInfo.BinaryBlockOutputInfo;
+                               JavaPairRDD<LongWritable, FrameBlock> rddIn = 
sc.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, 
FrameBlock.class);
+                               JavaPairRDD<MatrixIndexes,MatrixBlock> rddOut = 
FrameRDDConverterUtils.binaryBlockToMatrixBlock(rddIn, mc, mcMatrix);
+                               rddOut.saveAsHadoopFile(fnameOut, 
MatrixIndexes.class, MatrixBlock.class, oinfo.outputFormatClass);
+                               break;
+                       }
+                       default: 
+                               throw new RuntimeException("Unsuported 
converter type: "+type.toString());
                }
                
                sec.close();

Reply via email to