Repository: incubator-systemml
Updated Branches:
  refs/heads/master e5aaaf1e8 -> b1b9e838d


[SYSTEMML-560] Frame converter between TextCell and Binary block


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b1b9e838
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b1b9e838
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b1b9e838

Branch: refs/heads/master
Commit: b1b9e838d81d7ade1a62aaa60b18960fbea7f23b
Parents: e5aaaf1
Author: Arvind Surve <[email protected]>
Authored: Sun Apr 24 21:36:59 2016 -0700
Committer: Arvind Surve <[email protected]>
Committed: Sun Apr 24 21:36:59 2016 -0700

----------------------------------------------------------------------
 .../functions/ConvertFrameBlockToIJVLines.java  |  55 +++
 .../spark/utils/FrameRDDConverterUtils.java     | 255 ++++++++++++--
 .../spark/utils/RDDAggregateUtils.java          |  41 ++-
 .../instructions/spark/utils/SparkUtils.java    |  28 ++
 .../runtime/io/FrameReaderBinaryBlock.java      |   5 +-
 .../sysml/runtime/matrix/data/FrameBlock.java   |  43 +++
 .../matrix/mapred/FrameReblockBuffer.java       | 351 +++++++++++++++++++
 .../functions/frame/FrameConverterTest.java     |  55 ++-
 8 files changed, 803 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b1b9e838/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java
new file mode 100644
index 0000000..b8f8e11
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysml.runtime.instructions.spark.functions;
+
+import java.util.ArrayList;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.spark.api.java.function.FlatMapFunction;
+
+import scala.Tuple2;
+
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+
+public class ConvertFrameBlockToIJVLines implements 
FlatMapFunction<Tuple2<LongWritable,FrameBlock>, String> {
+
+       private static final long serialVersionUID = 1803516615963340115L;
+
+       int brlen; int bclen;
+       public ConvertFrameBlockToIJVLines(int brlen, int bclen) {
+               this.brlen = brlen;
+               this.bclen = bclen;
+       }
+
+       @Override
+       public Iterable<String> call(Tuple2<LongWritable, FrameBlock> kv) 
throws Exception {
+
+               long lRowIndex = kv._1.get();
+               FrameBlock block = kv._2;
+               
+               ArrayList<String> cells = new ArrayList<String>();
+               
+               for (int i=0; i<block.getNumRows(); ++i)
+                       for (int j=0; j<block.getNumColumns(); ++j) {
+                               Object obj = block.get(i, j);
+                               if(obj != null)
+                                       cells.add(lRowIndex+i+" "+(j+1)+" 
"+obj.toString());
+                       }
+               return cells;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b1b9e838/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 342f1df..02134c8 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -19,8 +19,11 @@
 
 package org.apache.sysml.runtime.instructions.spark.utils;
 
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -38,39 +41,21 @@ import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.instructions.spark.data.SerLongWritable;
 import org.apache.sysml.runtime.instructions.spark.data.SerText;
+import 
org.apache.sysml.runtime.instructions.spark.functions.ConvertFrameBlockToIJVLines;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.Pair;
+import org.apache.sysml.runtime.matrix.mapred.FrameReblockBuffer;
+import org.apache.sysml.runtime.util.FastStringTokenizer;
+import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class FrameRDDConverterUtils 
 {
-       /**
-        * 
-        * @param in
-        * @param mcIn
-        * @param props
-        * @param strict
-        * @return
-        */
-       public static JavaRDD<String> 
binaryBlockToCsv(JavaPairRDD<LongWritable,FrameBlock> in, MatrixCharacteristics 
mcIn, CSVFileFormatProperties props, boolean strict)
-       {
-               //convert input rdd to serializable long/frame block
-               JavaPairRDD<LongWritable,FrameBlock> input = 
-                               in.mapToPair(new LongWritableToSerFunction());
-               
-               //sort if required (on blocks/rows)
-               if( strict ) {
-                       input = input.sortByKey(true);
-               }
-               
-               //convert binary block to csv (from blocks/rows)
-               JavaRDD<String> out = input
-                               .flatMap(new BinaryBlockToCSVFunction(props));
-       
-               return out;
-       }
-       
+       //=====================================
+       // CSV <--> Binary block
+
        /**
         * 
         * @param sc
@@ -135,6 +120,104 @@ public class FrameRDDConverterUtils
        
        /**
         * 
+        * @param in
+        * @param mcIn
+        * @param props
+        * @param strict
+        * @return
+        */
+       public static JavaRDD<String> 
binaryBlockToCsv(JavaPairRDD<LongWritable,FrameBlock> in, MatrixCharacteristics 
mcIn, CSVFileFormatProperties props, boolean strict)
+       {
+               //convert input rdd to serializable long/frame block
+               JavaPairRDD<LongWritable,FrameBlock> input = 
+                               in.mapToPair(new LongWritableToSerFunction());
+               
+               //sort if required (on blocks/rows)
+               if( strict ) {
+                       input = input.sortByKey(true);
+               }
+               
+               //convert binary block to csv (from blocks/rows)
+               JavaRDD<String> out = input
+                               .flatMap(new BinaryBlockToCSVFunction(props));
+       
+               return out;
+       }
+       
+       
+       //=====================================
+       // cellText <--> Binary block
+       /**
+        * 
+        * @param sc
+        * @param input
+        * @param mcOut
+        * @param schema
+        * @return
+        * @throws DMLRuntimeException
+        */
+       public static JavaPairRDD<LongWritable, FrameBlock> 
textCellToBinaryBlock(JavaSparkContext sc,
+                       JavaPairRDD<LongWritable, Text> in, 
MatrixCharacteristics mcOut, List<ValueType> schema ) 
+               throws DMLRuntimeException  
+       {
+               
+               //convert input rdd to serializable long/frame block
+               JavaPairRDD<Long,Text> input = 
+                               in.mapToPair(new 
LongWritableTextToLongTextFunction());
+               
+               //Do actual conversion
+               JavaPairRDD<Long,FrameBlock> output = 
textCellToBinaryBlockLongIndex(sc, input, mcOut, schema);
+               
+               //convert input rdd to serializable long/frame block
+               JavaPairRDD<LongWritable,FrameBlock> out = 
+                               output.mapToPair(new 
LongFrameToLongWritableFrameFunction());
+               
+               return out;
+       }
+
+               
+       /**
+        * 
+        * @param sc
+        * @param input
+        * @param mcOut
+        * @param schema
+        * @return
+        * @throws DMLRuntimeException
+        */
+       public static JavaPairRDD<Long, FrameBlock> 
textCellToBinaryBlockLongIndex(JavaSparkContext sc,
+                       JavaPairRDD<Long, Text> input, MatrixCharacteristics 
mcOut, List<ValueType> schema ) 
+               throws DMLRuntimeException  
+       {
+               
+               //convert textcell rdd to binary block rdd (w/ partial blocks)
+               JavaPairRDD<Long, FrameBlock> output = 
input.values().mapPartitionsToPair(new TextToBinaryBlockFunction( mcOut, schema 
));
+               
+               //aggregate partial matrix blocks
+               JavaPairRDD<Long,FrameBlock> out = 
+                               RDDAggregateUtils.mergeByFrameKey( output ); 
+
+               return out;
+       }
+
+               
+       // Useful for printing, testing binary blocked RDD and also for 
external use.
+       public static JavaRDD<String> 
binaryBlockToStringRDD(JavaPairRDD<LongWritable, FrameBlock> input, 
MatrixCharacteristics mcIn, String format) throws DMLRuntimeException {
+               if(format.equals("text")) {
+                       JavaRDD<String> ijv = input.flatMap(new 
ConvertFrameBlockToIJVLines(mcIn.getRowsPerBlock(), mcIn.getColsPerBlock()));
+                       return ijv;
+               }
+               else {
+                       throw new DMLRuntimeException("The output format:" + 
format + " is not implemented yet.");
+               }
+       }
+       
+
+       /////////////////////////////////
+       // CSV-SPECIFIC FUNCTIONS
+       
+       /**
+        * 
         */
        private static class StringToSerTextFunction implements 
PairFunction<String, LongWritable, Text> 
        {
@@ -162,6 +245,36 @@ public class FrameRDDConverterUtils
        /**
         * 
         */
+       public static class LongWritableTextToLongTextFunction implements 
PairFunction<Tuple2<LongWritable,Text>,Long,Text> 
+       {
+               private static final long serialVersionUID = 
-5408386071466175348L;
+
+               @Override
+               public Tuple2<Long, Text> call(Tuple2<LongWritable, Text> arg0) 
throws Exception  {
+                       return new Tuple2<Long,Text>(new Long(arg0._1.get()), 
arg0._2);
+               }
+       }
+       
+       
+       
+       /**
+        * 
+        */
+       public static class LongFrameToLongWritableFrameFunction implements 
PairFunction<Tuple2<Long,FrameBlock>,LongWritable,FrameBlock> 
+       {
+
+               private static final long serialVersionUID = 
-1467314923206783333L;
+
+               @Override
+               public Tuple2<LongWritable, FrameBlock> call(Tuple2<Long, 
FrameBlock> arg0) throws Exception  {
+                       return new Tuple2<LongWritable, FrameBlock>(new 
LongWritable(arg0._1), arg0._2);
+               }
+       }
+
+       
+       /**
+        * 
+        */
        private static class TextToStringFunction implements 
Function<Text,String> 
        {
                private static final long serialVersionUID = 
-2744814934501782747L;
@@ -316,4 +429,94 @@ public class FrameRDDConverterUtils
                        return ret;
                }
        }
+       /////////////////////////////////
+       // TEXTCELL-SPECIFIC FUNCTIONS
+       
+       private static abstract class CellToBinaryBlockFunction implements 
Serializable
+       {
+               private static final long serialVersionUID = 
-729614449626680946L;
+
+               //internal buffer size (aligned w/ default matrix block size)
+               protected static final int BUFFER_SIZE = 4 * 1000 * 1000; //4M 
elements (32MB)
+               protected int _bufflen = -1;
+               
+               protected long _rlen = -1;
+               protected long _clen = -1;
+               
+               protected CellToBinaryBlockFunction(MatrixCharacteristics mc)
+               {
+                       _rlen = mc.getRows();
+                       _clen = mc.getCols();
+                       
+                       //determine upper bounded buffer len
+                       _bufflen = (int) Math.min(_rlen*_clen, BUFFER_SIZE);
+               }
+
+
+               /**
+                * 
+                * @param rbuff
+                * @param ret
+                * @throws IOException 
+                * @throws DMLRuntimeException 
+                */
+               protected void flushBufferToList( FrameReblockBuffer rbuff,  
ArrayList<Tuple2<Long,FrameBlock>> ret ) 
+                       throws IOException, DMLRuntimeException
+               {
+                       //temporary list of indexed matrix values to prevent 
library dependencies
+                       ArrayList<Pair<Long, FrameBlock>> rettmp = new 
ArrayList<Pair<Long, FrameBlock>>();
+                       rbuff.flushBufferToBinaryBlocks(rettmp);
+                       ret.addAll(SparkUtils.fromIndexedFrameBlock(rettmp));
+               }
+       }
+       
+       
+       /**
+        * 
+        */
+       private static class TextToBinaryBlockFunction extends 
CellToBinaryBlockFunction implements 
PairFlatMapFunction<Iterator<Text>,Long,FrameBlock> 
+       {
+               private static final long serialVersionUID = 
-2042208027876880588L;
+               List<ValueType> _schema = null;
+               
+               protected TextToBinaryBlockFunction(MatrixCharacteristics mc, 
List<ValueType> schema ) {
+                       super(mc);
+                       _schema = schema;
+               }
+
+               @Override
+               public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Text> 
arg0) 
+                       throws Exception 
+               {
+                       ArrayList<Tuple2<Long,FrameBlock>> ret = new 
ArrayList<Tuple2<Long,FrameBlock>>();
+                       FrameReblockBuffer rbuff = new 
FrameReblockBuffer(_bufflen, _rlen, _clen, _schema );
+                       FastStringTokenizer st = new FastStringTokenizer(' ');
+                       
+                       while( arg0.hasNext() )
+                       {
+                               //get input string (ignore matrix market 
comments)
+                               String strVal = arg0.next().toString();
+                               if( strVal.startsWith("%") ) 
+                                       continue;
+                               
+                               //parse input ijv triple
+                               st.reset( strVal );
+                               long row = st.nextLong();
+                               long col = st.nextLong();
+                               Object val = 
UtilFunctions.stringToObject(_schema.get((int)col-1), st.nextToken());
+                               
+                               //flush buffer if necessary
+                               if( rbuff.getSize() >= rbuff.getCapacity() )
+                                       flushBufferToList(rbuff, ret);
+                               
+                               //add value to reblock buffer
+                               rbuff.appendCell(row, col, val);
+                       }
+                       
+                       //final flush buffer
+                       flushBufferToList(rbuff, ret);
+               
+                       return ret;
+               }
+       }       
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b1b9e838/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
index 5665fce..c545c30 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -22,13 +22,13 @@ package org.apache.sysml.runtime.instructions.spark.utils;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
-
 import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.functionobjects.KahanPlus;
 import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import org.apache.sysml.runtime.instructions.spark.data.CorrMatrixBlock;
 import org.apache.sysml.runtime.instructions.spark.data.RowMatrixBlock;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
@@ -776,4 +776,43 @@ public class RDDAggregateUtils
                        return v1 + v2;
                }       
        }
+       
+       /**
+        * @param: in
+        * @return: 
+        */
+       public static JavaPairRDD<Long, FrameBlock> mergeByFrameKey( 
JavaPairRDD<Long, FrameBlock> in )
+       {
+               return in.reduceByKey(
+                               new MergeFrameBlocksFunction());
+       }
+       
+       /**
+        * 
+        */
+       private static class MergeFrameBlocksFunction implements 
Function2<FrameBlock, FrameBlock, FrameBlock> 
+       {               
+               private static final long serialVersionUID = 
-8881019027250258850L;
+
+               @Override
+               public FrameBlock call(FrameBlock b1, FrameBlock b2) 
+                       throws Exception 
+               {
+                       // sanity check input dimensions
+                       if (b1.getNumRows() != b2.getNumRows() || 
b1.getNumColumns() != b2.getNumColumns()) {
+                               throw new DMLRuntimeException("Mismatched frame 
block sizes for: "
+                                               + b1.getNumRows() + " " + 
b1.getNumColumns() + " "
+                                               + b2.getNumRows() + " " + 
b2.getNumColumns());
+                       }
+
+                       // execute merge (never pass by reference)
+                       FrameBlock ret = new FrameBlock(b1);
+                       ret.merge(b2);
+                       
+                       return ret;
+               }
+
+       }
+       
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b1b9e838/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
index 3c898a6..34db095 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java
@@ -37,9 +37,11 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import 
org.apache.sysml.runtime.instructions.spark.functions.CopyBinaryCellFunction;
 import org.apache.sysml.runtime.instructions.spark.functions.CopyBlockFunction;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixCell;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
@@ -90,6 +92,32 @@ public class SparkUtils
                return ret;
        }
        
+       
+       /**
+        * 
+        * @param in
+        * @return
+        */
+       public static Tuple2<Long,FrameBlock> fromIndexedFrameBlock( Pair<Long, 
FrameBlock> in ){
+               return new Tuple2<Long, FrameBlock>(in.getKey(), in.getValue());
+       }
+       
+       
+       /**
+        * 
+        * @param in
+        * @return
+        */
+       public static ArrayList<Tuple2<Long,FrameBlock>> fromIndexedFrameBlock( 
ArrayList<Pair<Long, FrameBlock>> in )
+       {
+               ArrayList<Tuple2<Long, FrameBlock>> ret = new 
ArrayList<Tuple2<Long, FrameBlock>>();
+               for( Pair<Long, FrameBlock> ifv : in )
+                       ret.add(fromIndexedFrameBlock(ifv));
+               
+               return ret;
+       }
+       
+       
        /**
         * 
         * @param mb

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b1b9e838/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
index dd1b744..534decf 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
@@ -93,7 +93,7 @@ public class FrameReaderBinaryBlock extends FrameReader
        private static void readBinaryBlockFrameFromHDFS( Path path, JobConf 
job, FileSystem fs, FrameBlock dest, long rlen, long clen )
                throws IOException, DMLRuntimeException
        {
-               LongWritable key = new LongWritable();
+               LongWritable key = new LongWritable(-1L);
                FrameBlock value = new FrameBlock();
                
                for( Path lpath : getSequenceFilePaths(fs, path) ) //1..N files 
@@ -109,6 +109,9 @@ public class FrameReaderBinaryBlock extends FrameReader
                                        
                                        int rows = value.getNumRows();
                                        int cols = value.getNumColumns();
+
+                                       if(rows == 0 || cols == 0)      //Empty 
block, ignore it.
+                                               continue;
                                        
                                        //bound check per block
                                        if( row_offset + rows < 0 || row_offset 
+ rows > rlen ) {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b1b9e838/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 634e18c..6323fea 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -678,6 +678,49 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                return map;
        }
 
+       /**
+        * 
+        * @param that
+        * @throws DMLRuntimeException 
+        */
+       public void merge(FrameBlock that ) 
+               throws DMLRuntimeException
+       {
+               //check for empty input source (nothing to merge)
+               if( that == null || that.getNumRows() == 0 )
+                       return;
+               
+               //check dimensions (before potentially copy to prevent implicit 
dimension change) 
+               if( getNumRows() != that.getNumRows() || getNumColumns() != 
that.getNumColumns() )
+                       throw new DMLRuntimeException("Dimension mismatch on 
merge disjoint (target="+getNumRows()+"x"+getNumColumns()+", 
source="+that.getNumRows()+"x"+that.getNumColumns()+")");
+               
+               //core frame block merge through cell copy
+               for( int i=0; i<getNumRows(); i++ ) {
+                       for( int j=0; j<getNumColumns(); j++ ) {
+                               switch( _schema.get(j) ) {
+                                       case STRING:  
+                                               if (that.get(i,j) != null)
+                                                       set(i,j,that.get(i, j));
+                                               break;
+                                       case BOOLEAN: 
+                                               if ((Boolean)that.get(i,j) != 
Boolean.getBoolean("false"))
+                                                       set(i,j,that.get(i, j));
+                                               break;
+                                       case INT:     
+                                               if ((Long)that.get(i,j) != 0)
+                                                       set(i,j,that.get(i, j));
+                                               break;
+                                       case DOUBLE:  
+                                               if ((Double)that.get(i,j) != 
0.0)
+                                                       set(i,j,that.get(i, j));
+                                               break;
+                                       default: throw new 
RuntimeException("Unsupported value type: "+_schema.get(j));
+                               }
+                       }
+               }
+               
+       }
+
        
        ///////
        // row iterators (over strings and boxed objects)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b1b9e838/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java
new file mode 100644
index 0000000..9b2cd37
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.sysml.runtime.matrix.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.Pair;
+
+/**
+ * 
+ * 
+ */
+public class FrameReblockBuffer
+{
+       
+       //default buffer size: 5M -> 5M * 3x8B = 120MB 
+       public static final int DEFAULT_BUFFER_SIZE = 5000000;
+       
+       private int _bufflen = -1;
+       private int _count = -1;
+       
+       private FrameCell[] _buff = null;
+       
+       private long _rlen = -1;
+       private long _clen = -1;
+       private int _brlen = -1;
+       private int _bclen = -1;
+       
+       private List<ValueType> _schema;
+
+
+       /**
+        * @param rlen
+        * @param clen
+        * @param schema
+        * @return
+        * 
+        */
+       public FrameReblockBuffer( long rlen, long clen, List<ValueType> schema 
)
+       {
+               this( DEFAULT_BUFFER_SIZE, rlen, clen, schema );
+       }
+       
+       /**
+        * @param buffersize
+        * @param rlen
+        * @param clen
+        * @param schema
+        * @return
+        * 
+        */
+       public FrameReblockBuffer( int buffersize,  long rlen, long clen, 
List<ValueType> schema )
+       {
+               _bufflen = buffersize;
+               _count = 0;
+               
+               _buff = new FrameCell[ _bufflen ];
+               for(int i=0; i< _bufflen; i++)
+                       _buff[i] = new FrameCell();
+               
+               _rlen = rlen;
+               _clen = clen;
+               _brlen = Math.max((int)(_bufflen/_clen), 1);
+               _bclen = 1;
+               
+               _schema = schema;
+       }
+       
+
+       /**
+        * 
+        * @param ix
+        * @param blen
+        * @return
+        */
+       private long getBlockIndex( long ix, int blen )
+       {
+               return (ix-1)/_brlen+1;
+       }
+       
+       /**
+        * 
+        * @param ix
+        * @param blen
+        * @return
+        */
+       private int getIndexInBlock( long ix, int blen )
+       {
+               return (int)((ix-1)%_brlen);
+       }
+       
+       public int getSize()
+       {
+               return _count;
+       }
+       
+       public int getCapacity()
+       {
+               return _bufflen;
+       }
+       
+       /**
+        * 
+        * @param r
+        * @param c
+        * @param obj
+        */
+       public void appendCell( long r, long c, Object obj )
+       {
+               _buff[_count].setRow((int)r);
+               _buff[_count].setCol((int)c);
+               _buff[_count].setObjVal(obj);
+               _count++;
+       }
+       
+       /**
+        * 
+        * @param r_offset
+        * @param c_offset
+        * @param inBlk
+        * @param out
+        * @throws IOException
+        */
+       public void appendBlock(long r_offset, long c_offset, FrameBlock inBlk, 
OutputCollector<Long, Writable> out ) 
+               throws IOException
+       {
+               {
+                       int rlen = inBlk.getNumRows();
+                       int clen = inBlk.getNumColumns();
+                       for( int i=0; i<rlen; i++ )
+                               for( int j=0; j<clen; j++ )
+                               {
+                                       Object obj = inBlk.get(i ,  j );
+                                       if( obj != null )
+                                       {
+                                               appendCell(r_offset+i, 
c_offset+j, obj);
+                                               
+                                               //check and flush if required
+                                               if( _count ==_bufflen )
+                                                       flushBuffer(out);
+                                       }
+                               }
+               }
+       }
+       
+
+       /**
+        * 
+        * @param out
+        * @throws IOException
+        */
+       public void flushBuffer( OutputCollector<Long, Writable> out ) 
+               throws IOException
+       {
+               if( _count == 0 )
+                       return;
+               
+               //Step 1) sort reblock buffer (blockwise, no in-block sorting!)
+               Arrays.sort( _buff, 0 ,_count, new 
FrameReblockBufferComparator() );
+               
+               //Step 2) output blocks 
+               Long tmpIx = -1L;
+               //create intermediate blocks
+               FrameBlock tmpBlock = new FrameBlock();
+               
+               //put values into block and output
+               long cbi = -1, cbj = -1; //current block indexes
+               for( int i=0; i<_count; i++ )
+               {
+                       long bi = getBlockIndex(_buff[i].getRow(), _brlen);
+                       long bj = getBlockIndex(_buff[i].getCol(), _bclen);
+                       
+                       //output block and switch to next index pair
+                       if( bi != cbi || bj != cbj ) {
+                               outputBlock(out, tmpIx, tmpBlock);
+                               cbi = bi;
+                               cbj = bj;                                       
+                               tmpIx = bi;
+                               tmpBlock.reset(Math.min(_brlen, 
(int)(_rlen-(bi-1)*_brlen)));
+                       }
+                       
+                       int ci = getIndexInBlock(_buff[i].getRow(), _brlen);
+                       int cj = getIndexInBlock(_buff[i].getCol(), _bclen);
+                       tmpBlock.set(ci, cj, _buff[i].getObjVal());
+               }
+               
+               //output last block 
+               outputBlock(out, tmpIx, tmpBlock);
+                       
+               _count = 0;
+       }
+       
+       /**
+        * 
+        * @param outList
+        * @throws IOException
+        * @throws DMLRuntimeException 
+        */
+       public void flushBufferToBinaryBlocks( ArrayList<Pair<Long, 
FrameBlock>> outList ) 
+               throws IOException, DMLRuntimeException
+       {
+               if( _count == 0 )
+                       return;
+               
+               //Step 1) sort reblock buffer (blockwise, no in-block sorting!)
+               Arrays.sort( _buff, 0 ,_count, new 
FrameReblockBufferComparator() );
+                               
+               //Step 2) output blocks 
+               Long tmpIx = -1L;
+               FrameBlock tmpBlock = new FrameBlock(_schema);
+               
+               //put values into block and output
+               long cbi = -1, cbj = -1; //current block indexes
+               for( int i=0; i<_count; i++ )
+               {
+                       long bi = getBlockIndex(_buff[i].getRow(), _brlen);
+                       long bj = getBlockIndex(_buff[i].getCol(), _bclen);
+                       
+                       //output block and switch to next index pair
+                       if( bi != cbi || bj != cbj ) {
+                               if( cbi != -1 && cbj != -1)
+                                       outputBlock(outList, tmpIx, tmpBlock);
+                               cbi = bi;
+                               cbj = bj;                                       
+                               tmpIx = (bi-1)*_brlen+1;
+                               tmpBlock = new FrameBlock(_schema);
+                               
tmpBlock.ensureAllocatedColumns(Math.min(_brlen, (int)(_rlen-(bi-1)*_brlen)));
+                               
+                       }
+                       
+                       int ci = getIndexInBlock(_buff[i].getRow(), _brlen);
+                       int cj = getIndexInBlock(_buff[i].getCol(), _bclen);
+                       tmpBlock.set(ci, cj, _buff[i].getObjVal());
+               }
+               
+               //output last block 
+               if( cbi != -1 && cbj != -1)
+                       outputBlock(outList, tmpIx, tmpBlock);
+               
+               _count = 0;
+       }
+       
+       /**
+        * 
+        * @param out
+        * @param key
+        * @param block
+        * @throws IOException
+        */
+       private static void outputBlock( OutputCollector<Long, Writable> out, 
Long key, FrameBlock block ) 
+               throws IOException
+       {
+               //skip output of unassigned blocks
+               if( key == -1)
+                       return;
+               
+               //output block
+               out.collect(key, block);
+       }
+       
+       /**
+        * 
+        * @param out
+        * @param key
+        * @param value
+        * @throws IOException
+        * @throws DMLRuntimeException 
+        */
+       private static void outputBlock( ArrayList<Pair<Long, FrameBlock>> out, 
Long key, FrameBlock value ) 
+               throws IOException, DMLRuntimeException
+       {
+               //skip output of unassigned blocks
+               if( key == -1 )
+                       return;
+               
+               //output block
+               out.add(new Pair<Long, FrameBlock>(new Long(key), value));
+       }
+       
+       private static class FrameCell 
+       {       
+               private int iRow;
+               private int iCol;
+               private Object objVal;
+               public int getRow() {
+                       return iRow;
+               }
+               public void setRow(int iRow) {
+                       this.iRow = iRow;
+               }
+               public int getCol() {
+                       return iCol;
+               }
+               public void setCol(int iCol) {
+                       this.iCol = iCol;
+               }
+               public Object getObjVal() {
+                       return objVal;
+               }
+               public void setObjVal(Object objVal) {
+                       this.objVal = objVal;
+               }
+       }
+       
+       /**
+        * Comparator to sort the reblock buffer by block indexes, where we 
+        * compute the block indexes on-the-fly based on the given cell indexes.
+        * 
+        */
+       private class FrameReblockBufferComparator implements 
Comparator<FrameCell> 
+       {       
+               @Override
+               public int compare(FrameCell arg0, FrameCell arg1) 
+               {
+                       long bi0 = arg0.getRow();
+                       long bj0 = arg0.getCol();
+                       long bi1 = arg1.getRow();
+                       long bj1 = arg1.getCol();
+                       
+                       return ( bi0 < bi1 || (bi0 == bi1 && bj0 < bj1) ) ? -1 :
+                   (( bi0 == bi1 && bj0 == bj1)? 0 : 1);               
+               }               
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b1b9e838/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
index 7c0d4f0..05d752c 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -64,6 +65,8 @@ public class FrameConverterTest extends AutomatedTestBase
        private enum ConvType {
                CSV2BIN,
                BIN2CSV,
+               TXTCELL2BIN,
+               BIN2TXTCELL
        }
        
        @Override
@@ -92,6 +95,26 @@ public class FrameConverterTest extends AutomatedTestBase
                runFrameConverterTest(schemaMixed, ConvType.BIN2CSV);
        }
 
+       @Test
+       public void testFrameStringsTxtCellBinSpark()  {
+               runFrameConverterTest(schemaStrings, ConvType.TXTCELL2BIN);
+       }
+       
+       @Test
+       public void testFrameMixedTxtCellBinSpark()  {
+               runFrameConverterTest(schemaMixed, ConvType.TXTCELL2BIN);
+       }
+       
+       @Test
+       public void testFrameStringsBinTxtCellSpark()  {
+               runFrameConverterTest(schemaStrings, ConvType.BIN2TXTCELL);
+       }
+       
+       @Test
+       public void testFrameMixedBinTxtCellSpark()  {
+               runFrameConverterTest(schemaMixed, ConvType.BIN2TXTCELL);
+       }
+
        
        /**
         * 
@@ -106,6 +129,9 @@ public class FrameConverterTest extends AutomatedTestBase
                boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
                DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 
+               SparkConf conf = new 
SparkConf().setAppName("Frame").setMaster("local");
+               conf.set("spark.kryo.classesToRegister", 
"org.apache.hadoop.io.LongWritable");
+
                try
                {
                        TestConfiguration config = 
getTestConfiguration(TEST_NAME);
@@ -126,6 +152,14 @@ public class FrameConverterTest extends AutomatedTestBase
                                        oinfo = 
OutputInfo.BinaryBlockOutputInfo;
                                        iinfo = InputInfo.CSVInputInfo;
                                        break;
+                               case TXTCELL2BIN:
+                                       oinfo = OutputInfo.TextCellOutputInfo;
+                                       iinfo = InputInfo.BinaryBlockInputInfo;
+                                       break;
+                               case BIN2TXTCELL:
+                                       oinfo = 
OutputInfo.BinaryBlockOutputInfo;
+                                       iinfo = InputInfo.TextCellInputInfo;
+                                       break;
                                default: 
                                        throw new RuntimeException("Unsuported 
converter type: "+type.toString());
                        }
@@ -141,7 +175,7 @@ public class FrameConverterTest extends AutomatedTestBase
 
                        //run converter under test
                        MatrixCharacteristics mc = new 
MatrixCharacteristics(rows, schema.length, -1, -1, -1);
-                       runConverter(type, mc, input("A"), output("B"));
+                       runConverter(type, mc, Arrays.asList(schema), 
input("A"), output("B"));
                        
                        //read frame data from hdfs
                        FrameReader reader = 
FrameReaderFactory.createFrameReader(iinfo);
@@ -200,12 +234,13 @@ public class FrameConverterTest extends AutomatedTestBase
         * @param frame1
         * @param frame2
         * @param fprop
+        * @param schema
         * @return 
         * @throws DMLRuntimeException, IOException
         */
 
        @SuppressWarnings("unchecked")
-       private void runConverter(ConvType type, MatrixCharacteristics mc, 
String fnameIn, String fnameOut)
+       private void runConverter(ConvType type, MatrixCharacteristics mc, 
List<ValueType> schema, String fnameIn, String fnameOut)
                throws DMLRuntimeException, IOException
        {
                SparkExecutionContext sec = (SparkExecutionContext) 
ExecutionContextFactory.createContext();            
@@ -231,6 +266,22 @@ public class FrameConverterTest extends AutomatedTestBase
                                rddOut.saveAsTextFile(fnameOut);
                                break;
                        }
+                       case TXTCELL2BIN: {
+                               InputInfo iinfo = InputInfo.TextCellInputInfo;
+                               OutputInfo oinfo = 
OutputInfo.BinaryBlockOutputInfo;
+                               JavaPairRDD<LongWritable,Text> rddIn = 
sc.hadoopFile(fnameIn, iinfo.inputFormatClass, iinfo.inputKeyClass, 
iinfo.inputValueClass);
+                               JavaPairRDD<LongWritable, FrameBlock> rddOut = 
FrameRDDConverterUtils
+                                               .textCellToBinaryBlock(sc, 
rddIn, mc, schema);
+                               rddOut.saveAsHadoopFile(fnameOut, 
LongWritable.class, FrameBlock.class, oinfo.outputFormatClass);
+                               break;
+                       }
+                       case BIN2TXTCELL: {
+                               InputInfo iinfo = 
InputInfo.BinaryBlockInputInfo;
+                               JavaPairRDD<LongWritable, FrameBlock> rddIn = 
sc.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, 
FrameBlock.class);
+                               JavaRDD<String> rddOut = 
FrameRDDConverterUtils.binaryBlockToStringRDD(rddIn, mc, "text");
+                               rddOut.saveAsTextFile(fnameOut);
+                               break;
+                       }
                }
                
                sec.close();

Reply via email to