Repository: incubator-systemml Updated Branches: refs/heads/master e5aaaf1e8 -> b1b9e838d
[SYSTEMML-560] Frame converter between TextCell and Binary block Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b1b9e838 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b1b9e838 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b1b9e838 Branch: refs/heads/master Commit: b1b9e838d81d7ade1a62aaa60b18960fbea7f23b Parents: e5aaaf1 Author: Arvind Surve <[email protected]> Authored: Sun Apr 24 21:36:59 2016 -0700 Committer: Arvind Surve <[email protected]> Committed: Sun Apr 24 21:36:59 2016 -0700 ---------------------------------------------------------------------- .../functions/ConvertFrameBlockToIJVLines.java | 55 +++ .../spark/utils/FrameRDDConverterUtils.java | 255 ++++++++++++-- .../spark/utils/RDDAggregateUtils.java | 41 ++- .../instructions/spark/utils/SparkUtils.java | 28 ++ .../runtime/io/FrameReaderBinaryBlock.java | 5 +- .../sysml/runtime/matrix/data/FrameBlock.java | 43 +++ .../matrix/mapred/FrameReblockBuffer.java | 351 +++++++++++++++++++ .../functions/frame/FrameConverterTest.java | 55 ++- 8 files changed, 803 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b1b9e838/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java new file mode 100644 index 0000000..b8f8e11 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/functions/ConvertFrameBlockToIJVLines.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysml.runtime.instructions.spark.functions; + +import java.util.ArrayList; +import org.apache.hadoop.io.LongWritable; +import org.apache.spark.api.java.function.FlatMapFunction; + +import scala.Tuple2; + +import org.apache.sysml.runtime.matrix.data.FrameBlock; + +public class ConvertFrameBlockToIJVLines implements FlatMapFunction<Tuple2<LongWritable,FrameBlock>, String> { + + private static final long serialVersionUID = 1803516615963340115L; + + int brlen; int bclen; + public ConvertFrameBlockToIJVLines(int brlen, int bclen) { + this.brlen = brlen; + this.bclen = bclen; + } + + @Override + public Iterable<String> call(Tuple2<LongWritable, FrameBlock> kv) throws Exception { + + long lRowIndex = kv._1.get(); + FrameBlock block = kv._2; + + ArrayList<String> cells = new ArrayList<String>(); + + for (int i=0; i<block.getNumRows(); ++i) + for (int j=0; j<block.getNumColumns(); ++j) { + Object obj = block.get(i, j); + if(obj != null) + cells.add(lRowIndex+i+" "+(j+1)+" "+obj.toString()); + } + return cells; + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b1b9e838/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java index 342f1df..02134c8 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java @@ -19,8 +19,11 @@ package org.apache.sysml.runtime.instructions.spark.utils; +import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -38,39 +41,21 @@ import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.instructions.spark.data.SerLongWritable; import org.apache.sysml.runtime.instructions.spark.data.SerText; +import org.apache.sysml.runtime.instructions.spark.functions.ConvertFrameBlockToIJVLines; import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.Pair; +import org.apache.sysml.runtime.matrix.mapred.FrameReblockBuffer; +import org.apache.sysml.runtime.util.FastStringTokenizer; +import org.apache.sysml.runtime.util.UtilFunctions; public class FrameRDDConverterUtils { - /** - * - * @param in - * @param mcIn - * @param props - * @param strict - * @return - */ - public static JavaRDD<String> binaryBlockToCsv(JavaPairRDD<LongWritable,FrameBlock> in, MatrixCharacteristics mcIn, CSVFileFormatProperties props, boolean strict) - { - //convert input rdd to serializable long/frame block - JavaPairRDD<LongWritable,FrameBlock> input = - in.mapToPair(new LongWritableToSerFunction()); - - //sort if required (on blocks/rows) - if( strict ) { - input = input.sortByKey(true); - } - - //convert binary block to csv (from blocks/rows) - JavaRDD<String> out = input - .flatMap(new BinaryBlockToCSVFunction(props)); - - return out; - } - + //===================================== + // CSV <--> Binary block + /** * * @param sc @@ -135,6 +120,104 @@ public class FrameRDDConverterUtils /** * + * @param in + * @param mcIn + * @param props + * @param strict + * @return + */ + public static JavaRDD<String> binaryBlockToCsv(JavaPairRDD<LongWritable,FrameBlock> in, MatrixCharacteristics mcIn, CSVFileFormatProperties props, boolean strict) + { + //convert input rdd to serializable long/frame block + JavaPairRDD<LongWritable,FrameBlock> input = + in.mapToPair(new LongWritableToSerFunction()); + + //sort if required (on blocks/rows) + if( strict ) { + input = input.sortByKey(true); + } + + //convert binary block to csv (from blocks/rows) + JavaRDD<String> out = input + .flatMap(new BinaryBlockToCSVFunction(props)); + + return out; + } + + + //===================================== + // cellText <--> Binary block + /** + * + * @param sc + * @param input + * @param mcOut + * @param schema + * @return + * @throws DMLRuntimeException + */ + public static JavaPairRDD<LongWritable, FrameBlock> textCellToBinaryBlock(JavaSparkContext sc, + JavaPairRDD<LongWritable, Text> in, MatrixCharacteristics mcOut, List<ValueType> schema ) + throws DMLRuntimeException + { + + //convert input rdd to serializable long/frame block + JavaPairRDD<Long,Text> input = + in.mapToPair(new LongWritableTextToLongTextFunction()); + + //Do actual conversion + JavaPairRDD<Long,FrameBlock> output = textCellToBinaryBlockLongIndex(sc, input, mcOut, schema); + + //convert input rdd to serializable long/frame block + JavaPairRDD<LongWritable,FrameBlock> out = + output.mapToPair(new LongFrameToLongWritableFrameFunction()); + + return out; + } + + + /** + * + * @param sc + * @param input + * @param mcOut + * @param schema + * @return + * @throws DMLRuntimeException + */ + public static JavaPairRDD<Long, FrameBlock> textCellToBinaryBlockLongIndex(JavaSparkContext sc, + JavaPairRDD<Long, Text> input, MatrixCharacteristics mcOut, List<ValueType> schema ) + throws DMLRuntimeException + { + + //convert textcell rdd to binary block rdd (w/ partial blocks) + JavaPairRDD<Long, FrameBlock> output = input.values().mapPartitionsToPair(new TextToBinaryBlockFunction( mcOut, schema )); + + //aggregate partial matrix blocks + JavaPairRDD<Long,FrameBlock> out = + RDDAggregateUtils.mergeByFrameKey( output ); + + return out; + } + + + // Useful for printing, testing binary blocked RDD and also for external use. + public static JavaRDD<String> binaryBlockToStringRDD(JavaPairRDD<LongWritable, FrameBlock> input, MatrixCharacteristics mcIn, String format) throws DMLRuntimeException { + if(format.equals("text")) { + JavaRDD<String> ijv = input.flatMap(new ConvertFrameBlockToIJVLines(mcIn.getRowsPerBlock(), mcIn.getColsPerBlock())); + return ijv; + } + else { + throw new DMLRuntimeException("The output format:" + format + " is not implemented yet."); + } + } + + + ///////////////////////////////// + // CSV-SPECIFIC FUNCTIONS + + /** + * */ private static class StringToSerTextFunction implements PairFunction<String, LongWritable, Text> { @@ -162,6 +245,36 @@ public class FrameRDDConverterUtils /** * */ + public static class LongWritableTextToLongTextFunction implements PairFunction<Tuple2<LongWritable,Text>,Long,Text> + { + private static final long serialVersionUID = -5408386071466175348L; + + @Override + public Tuple2<Long, Text> call(Tuple2<LongWritable, Text> arg0) throws Exception { + return new Tuple2<Long,Text>(new Long(arg0._1.get()), arg0._2); + } + } + + + + /** + * + */ + public static class LongFrameToLongWritableFrameFunction implements PairFunction<Tuple2<Long,FrameBlock>,LongWritable,FrameBlock> + { + + private static final long serialVersionUID = -1467314923206783333L; + + @Override + public Tuple2<LongWritable, FrameBlock> call(Tuple2<Long, FrameBlock> arg0) throws Exception { + return new Tuple2<LongWritable, FrameBlock>(new LongWritable(arg0._1), arg0._2); + } + } + + + /** + * + */ private static class TextToStringFunction implements Function<Text,String> { private static final long serialVersionUID = -2744814934501782747L; @@ -316,4 +429,94 @@ public class FrameRDDConverterUtils return ret; } } + ///////////////////////////////// + // TEXTCELL-SPECIFIC FUNCTIONS + + private static abstract class CellToBinaryBlockFunction implements Serializable + { + private static final long serialVersionUID = -729614449626680946L; + + //internal buffer size (aligned w/ default matrix block size) + protected static final int BUFFER_SIZE = 4 * 1000 * 1000; //4M elements (32MB) + protected int _bufflen = -1; + + protected long _rlen = -1; + protected long _clen = -1; + + protected CellToBinaryBlockFunction(MatrixCharacteristics mc) + { + _rlen = mc.getRows(); + _clen = mc.getCols(); + + //determine upper bounded buffer len + _bufflen = (int) Math.min(_rlen*_clen, BUFFER_SIZE); + } + + + /** + * + * @param rbuff + * @param ret + * @throws IOException + * @throws DMLRuntimeException + */ + protected void flushBufferToList( FrameReblockBuffer rbuff, ArrayList<Tuple2<Long,FrameBlock>> ret ) + throws IOException, DMLRuntimeException + { + //temporary list of indexed matrix values to prevent library dependencies + ArrayList<Pair<Long, FrameBlock>> rettmp = new ArrayList<Pair<Long, FrameBlock>>(); + rbuff.flushBufferToBinaryBlocks(rettmp); + ret.addAll(SparkUtils.fromIndexedFrameBlock(rettmp)); + } + } + + + /** + * + */ + private static class TextToBinaryBlockFunction extends CellToBinaryBlockFunction implements PairFlatMapFunction<Iterator<Text>,Long,FrameBlock> + { + private static final long serialVersionUID = -2042208027876880588L; + List<ValueType> _schema = null; + + protected TextToBinaryBlockFunction(MatrixCharacteristics mc, List<ValueType> schema ) { + super(mc); + _schema = schema; + } + + @Override + public Iterable<Tuple2<Long, FrameBlock>> call(Iterator<Text> arg0) + throws Exception + { + ArrayList<Tuple2<Long,FrameBlock>> ret = new ArrayList<Tuple2<Long,FrameBlock>>(); + FrameReblockBuffer rbuff = new FrameReblockBuffer(_bufflen, _rlen, _clen, _schema ); + FastStringTokenizer st = new FastStringTokenizer(' '); + + while( arg0.hasNext() ) + { + //get input string (ignore matrix market comments) + String strVal = arg0.next().toString(); + if( strVal.startsWith("%") ) + continue; + + //parse input ijv triple + st.reset( strVal ); + long row = st.nextLong(); + long col = st.nextLong(); + Object val = UtilFunctions.stringToObject(_schema.get((int)col-1), st.nextToken()); + + //flush buffer if necessary + if( rbuff.getSize() >= rbuff.getCapacity() ) + flushBufferToList(rbuff, ret); + + //add value to reblock buffer + rbuff.appendCell(row, col, val); + } + + //final flush buffer + flushBufferToList(rbuff, ret); + + return ret; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b1b9e838/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java index 5665fce..c545c30 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java @@ -22,13 +22,13 @@ package org.apache.sysml.runtime.instructions.spark.utils; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; - import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.functionobjects.KahanPlus; import org.apache.sysml.runtime.instructions.cp.KahanObject; import org.apache.sysml.runtime.instructions.spark.data.CorrMatrixBlock; import org.apache.sysml.runtime.instructions.spark.data.RowMatrixBlock; +import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues; @@ -776,4 +776,43 @@ public class RDDAggregateUtils return v1 + v2; } } + + /** + * @param: in + * @return: + */ + public static JavaPairRDD<Long, FrameBlock> mergeByFrameKey( JavaPairRDD<Long, FrameBlock> in ) + { + return in.reduceByKey( + new MergeFrameBlocksFunction()); + } + + /** + * + */ + private static class MergeFrameBlocksFunction implements Function2<FrameBlock, FrameBlock, FrameBlock> + { + private static final long serialVersionUID = -8881019027250258850L; + + @Override + public FrameBlock call(FrameBlock b1, FrameBlock b2) + throws Exception + { + // sanity check input dimensions + if (b1.getNumRows() != b2.getNumRows() || b1.getNumColumns() != b2.getNumColumns()) { + throw new DMLRuntimeException("Mismatched frame block sizes for: " + + b1.getNumRows() + " " + b1.getNumColumns() + " " + + b2.getNumRows() + " " + b2.getNumColumns()); + } + + // execute merge (never pass by reference) + FrameBlock ret = new FrameBlock(b1); + ret.merge(b2); + + return ret; + } + + } + + } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b1b9e838/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java index 3c898a6..34db095 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java @@ -37,9 +37,11 @@ import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.instructions.spark.functions.CopyBinaryCellFunction; import org.apache.sysml.runtime.instructions.spark.functions.CopyBlockFunction; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; +import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixCell; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; +import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue; import org.apache.sysml.runtime.util.UtilFunctions; @@ -90,6 +92,32 @@ public class SparkUtils return ret; } + + /** + * + * @param in + * @return + */ + public static Tuple2<Long,FrameBlock> fromIndexedFrameBlock( Pair<Long, FrameBlock> in ){ + return new Tuple2<Long, FrameBlock>(in.getKey(), in.getValue()); + } + + + /** + * + * @param in + * @return + */ + public static ArrayList<Tuple2<Long,FrameBlock>> fromIndexedFrameBlock( ArrayList<Pair<Long, FrameBlock>> in ) + { + ArrayList<Tuple2<Long, FrameBlock>> ret = new ArrayList<Tuple2<Long, FrameBlock>>(); + for( Pair<Long, FrameBlock> ifv : in ) + ret.add(fromIndexedFrameBlock(ifv)); + + return ret; + } + + /** * * @param mb http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b1b9e838/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java index dd1b744..534decf 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java @@ -93,7 +93,7 @@ public class FrameReaderBinaryBlock extends FrameReader private static void readBinaryBlockFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, long rlen, long clen ) throws IOException, DMLRuntimeException { - LongWritable key = new LongWritable(); + LongWritable key = new LongWritable(-1L); FrameBlock value = new FrameBlock(); for( Path lpath : getSequenceFilePaths(fs, path) ) //1..N files @@ -109,6 +109,9 @@ public class FrameReaderBinaryBlock extends FrameReader int rows = value.getNumRows(); int cols = value.getNumColumns(); + + if(rows == 0 || cols == 0) //Empty block, ignore it. + continue; //bound check per block if( row_offset + rows < 0 || row_offset + rows > rlen ) { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b1b9e838/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java index 634e18c..6323fea 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java @@ -678,6 +678,49 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable return map; } + /** + * + * @param that + * @throws DMLRuntimeException + */ + public void merge(FrameBlock that ) + throws DMLRuntimeException + { + //check for empty input source (nothing to merge) + if( that == null || that.getNumRows() == 0 ) + return; + + //check dimensions (before potentially copy to prevent implicit dimension change) + if( getNumRows() != that.getNumRows() || getNumColumns() != that.getNumColumns() ) + throw new DMLRuntimeException("Dimension mismatch on merge disjoint (target="+getNumRows()+"x"+getNumColumns()+", source="+that.getNumRows()+"x"+that.getNumColumns()+")"); + + //core frame block merge through cell copy + for( int i=0; i<getNumRows(); i++ ) { + for( int j=0; j<getNumColumns(); j++ ) { + switch( _schema.get(j) ) { + case STRING: + if (that.get(i,j) != null) + set(i,j,that.get(i, j)); + break; + case BOOLEAN: + if ((Boolean)that.get(i,j) != Boolean.getBoolean("false")) + set(i,j,that.get(i, j)); + break; + case INT: + if ((Long)that.get(i,j) != 0) + set(i,j,that.get(i, j)); + break; + case DOUBLE: + if ((Double)that.get(i,j) != 0.0) + set(i,j,that.get(i, j)); + break; + default: throw new RuntimeException("Unsupported value type: "+_schema.get(j)); + } + } + } + + } + /////// // row iterators (over strings and boxed objects) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b1b9e838/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java new file mode 100644 index 0000000..9b2cd37 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/FrameReblockBuffer.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.sysml.runtime.matrix.mapred; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.sysml.parser.Expression.ValueType; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.Pair; + +/** + * + * + */ +public class FrameReblockBuffer +{ + + //default buffer size: 5M -> 5M * 3x8B = 120MB + public static final int DEFAULT_BUFFER_SIZE = 5000000; + + private int _bufflen = -1; + private int _count = -1; + + private FrameCell[] _buff = null; + + private long _rlen = -1; + private long _clen = -1; + private int _brlen = -1; + private int _bclen = -1; + + private List<ValueType> _schema; + + + /** + * @param rlen + * @param clen + * @param schema + * @return + * + */ + public FrameReblockBuffer( long rlen, long clen, List<ValueType> schema ) + { + this( DEFAULT_BUFFER_SIZE, rlen, clen, schema ); + } + + /** + * @param buffersize + * @param rlen + * @param clen + * @param schema + * @return + * + */ + public FrameReblockBuffer( int buffersize, long rlen, long clen, List<ValueType> schema ) + { + _bufflen = buffersize; + _count = 0; + + _buff = new FrameCell[ _bufflen ]; + for(int i=0; i< _bufflen; i++) + _buff[i] = new FrameCell(); + + _rlen = rlen; + _clen = clen; + _brlen = Math.max((int)(_bufflen/_clen), 1); + _bclen = 1; + + _schema = schema; + } + + + /** + * + * @param ix + * @param blen + * @return + */ + private long getBlockIndex( long ix, int blen ) + { + return (ix-1)/_brlen+1; + } + + /** + * + * @param ix + * @param blen + * @return + */ + private int getIndexInBlock( long ix, int blen ) + { + return (int)((ix-1)%_brlen); + } + + public int getSize() + { + return _count; + } + + public int getCapacity() + { + return _bufflen; + } + + /** + * + * @param r + * @param c + * @param obj + */ + public void appendCell( long r, long c, Object obj ) + { + _buff[_count].setRow((int)r); + _buff[_count].setCol((int)c); + _buff[_count].setObjVal(obj); + _count++; + } + + /** + * + * @param r_offset + * @param c_offset + * @param inBlk + * @param out + * @throws IOException + */ + public void appendBlock(long r_offset, long c_offset, FrameBlock inBlk, OutputCollector<Long, Writable> out ) + throws IOException + { + { + int rlen = inBlk.getNumRows(); + int clen = inBlk.getNumColumns(); + for( int i=0; i<rlen; i++ ) + for( int j=0; j<clen; j++ ) + { + Object obj = inBlk.get(i , j ); + if( obj != null ) + { + appendCell(r_offset+i, c_offset+j, obj); + + //check and flush if required + if( _count ==_bufflen ) + flushBuffer(out); + } + } + } + } + + + /** + * + * @param out + * @throws IOException + */ + public void flushBuffer( OutputCollector<Long, Writable> out ) + throws IOException + { + if( _count == 0 ) + return; + + //Step 1) sort reblock buffer (blockwise, no in-block sorting!) + Arrays.sort( _buff, 0 ,_count, new FrameReblockBufferComparator() ); + + //Step 2) output blocks + Long tmpIx = -1L; + //create intermediate blocks + FrameBlock tmpBlock = new FrameBlock(); + + //put values into block and output + long cbi = -1, cbj = -1; //current block indexes + for( int i=0; i<_count; i++ ) + { + long bi = getBlockIndex(_buff[i].getRow(), _brlen); + long bj = getBlockIndex(_buff[i].getCol(), _bclen); + + //output block and switch to next index pair + if( bi != cbi || bj != cbj ) { + outputBlock(out, tmpIx, tmpBlock); + cbi = bi; + cbj = bj; + tmpIx = bi; + tmpBlock.reset(Math.min(_brlen, (int)(_rlen-(bi-1)*_brlen))); + } + + int ci = getIndexInBlock(_buff[i].getRow(), _brlen); + int cj = getIndexInBlock(_buff[i].getCol(), _bclen); + tmpBlock.set(ci, cj, _buff[i].getObjVal()); + } + + //output last block + outputBlock(out, tmpIx, tmpBlock); + + _count = 0; + } + + /** + * + * @param outList + * @throws IOException + * @throws DMLRuntimeException + */ + public void flushBufferToBinaryBlocks( ArrayList<Pair<Long, FrameBlock>> outList ) + throws IOException, DMLRuntimeException + { + if( _count == 0 ) + return; + + //Step 1) sort reblock buffer (blockwise, no in-block sorting!) + Arrays.sort( _buff, 0 ,_count, new FrameReblockBufferComparator() ); + + //Step 2) output blocks + Long tmpIx = -1L; + FrameBlock tmpBlock = new FrameBlock(_schema); + + //put values into block and output + long cbi = -1, cbj = -1; //current block indexes + for( int i=0; i<_count; i++ ) + { + long bi = getBlockIndex(_buff[i].getRow(), _brlen); + long bj = getBlockIndex(_buff[i].getCol(), _bclen); + + //output block and switch to next index pair + if( bi != cbi || bj != cbj ) { + if( cbi != -1 && cbj != -1) + outputBlock(outList, tmpIx, tmpBlock); + cbi = bi; + cbj = bj; + tmpIx = (bi-1)*_brlen+1; + tmpBlock = new FrameBlock(_schema); + tmpBlock.ensureAllocatedColumns(Math.min(_brlen, (int)(_rlen-(bi-1)*_brlen))); + + } + + int ci = getIndexInBlock(_buff[i].getRow(), _brlen); + int cj = getIndexInBlock(_buff[i].getCol(), _bclen); + tmpBlock.set(ci, cj, _buff[i].getObjVal()); + } + + //output last block + if( cbi != -1 && cbj != -1) + outputBlock(outList, tmpIx, tmpBlock); + + _count = 0; + } + + /** + * + * @param out + * @param key + * @param block + * @throws IOException + */ + private static void outputBlock( OutputCollector<Long, Writable> out, Long key, FrameBlock block ) + throws IOException + { + //skip output of unassigned blocks + if( key == -1) + return; + + //output block + out.collect(key, block); + } + + /** + * + * @param out + * @param key + * @param value + * @throws IOException + * @throws DMLRuntimeException + */ + private static void outputBlock( ArrayList<Pair<Long, FrameBlock>> out, Long key, FrameBlock value ) + throws IOException, DMLRuntimeException + { + //skip output of unassigned blocks + if( key == -1 ) + return; + + //output block + out.add(new Pair<Long, FrameBlock>(new Long(key), value)); + } + + private static class FrameCell + { + private int iRow; + private int iCol; + private Object objVal; + public int getRow() { + return iRow; + } + public void setRow(int iRow) { + this.iRow = iRow; + } + public int getCol() { + return iCol; + } + public void setCol(int iCol) { + this.iCol = iCol; + } + public Object getObjVal() { + return objVal; + } + public void setObjVal(Object objVal) { + this.objVal = objVal; + } + } + + /** + * Comparator to sort the reblock buffer by block indexes, where we + * compute the block indexes on-the-fly based on the given cell indexes. + * + */ + private class FrameReblockBufferComparator implements Comparator<FrameCell> + { + @Override + public int compare(FrameCell arg0, FrameCell arg1) + { + long bi0 = arg0.getRow(); + long bj0 = arg0.getCol(); + long bi1 = arg1.getRow(); + long bj1 = arg1.getCol(); + + return ( bi0 < bi1 || (bi0 == bi1 && bj0 < bj1) ) ? -1 : + (( bi0 == bi1 && bj0 == bj1)? 0 : 1); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b1b9e838/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java index 7c0d4f0..05d752c 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -64,6 +65,8 @@ public class FrameConverterTest extends AutomatedTestBase private enum ConvType { CSV2BIN, BIN2CSV, + TXTCELL2BIN, + BIN2TXTCELL } @Override @@ -92,6 +95,26 @@ public class FrameConverterTest extends AutomatedTestBase runFrameConverterTest(schemaMixed, ConvType.BIN2CSV); } + @Test + public void testFrameStringsTxtCellBinSpark() { + runFrameConverterTest(schemaStrings, ConvType.TXTCELL2BIN); + } + + @Test + public void testFrameMixedTxtCellBinSpark() { + runFrameConverterTest(schemaMixed, ConvType.TXTCELL2BIN); + } + + @Test + public void testFrameStringsBinTxtCellSpark() { + runFrameConverterTest(schemaStrings, ConvType.BIN2TXTCELL); + } + + @Test + public void testFrameMixedBinTxtCellSpark() { + runFrameConverterTest(schemaMixed, ConvType.BIN2TXTCELL); + } + /** * @@ -106,6 +129,9 @@ public class FrameConverterTest extends AutomatedTestBase boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; DMLScript.USE_LOCAL_SPARK_CONFIG = true; + SparkConf conf = new SparkConf().setAppName("Frame").setMaster("local"); + conf.set("spark.kryo.classesToRegister", "org.apache.hadoop.io.LongWritable"); + try { TestConfiguration config = getTestConfiguration(TEST_NAME); @@ -126,6 +152,14 @@ public class FrameConverterTest extends AutomatedTestBase oinfo = OutputInfo.BinaryBlockOutputInfo; iinfo = InputInfo.CSVInputInfo; break; + case TXTCELL2BIN: + oinfo = OutputInfo.TextCellOutputInfo; + iinfo = InputInfo.BinaryBlockInputInfo; + break; + case BIN2TXTCELL: + oinfo = OutputInfo.BinaryBlockOutputInfo; + iinfo = InputInfo.TextCellInputInfo; + break; default: throw new RuntimeException("Unsuported converter type: "+type.toString()); } @@ -141,7 +175,7 @@ public class FrameConverterTest extends AutomatedTestBase //run converter under test MatrixCharacteristics mc = new MatrixCharacteristics(rows, schema.length, -1, -1, -1); - runConverter(type, mc, input("A"), output("B")); + runConverter(type, mc, Arrays.asList(schema), input("A"), output("B")); //read frame data from hdfs FrameReader reader = FrameReaderFactory.createFrameReader(iinfo); @@ -200,12 +234,13 @@ public class FrameConverterTest extends AutomatedTestBase * @param frame1 * @param frame2 * @param fprop + * @param schema * @return * @throws DMLRuntimeException, IOException */ @SuppressWarnings("unchecked") - private void runConverter(ConvType type, MatrixCharacteristics mc, String fnameIn, String fnameOut) + private void runConverter(ConvType type, MatrixCharacteristics mc, List<ValueType> schema, String fnameIn, String fnameOut) throws DMLRuntimeException, IOException { SparkExecutionContext sec = (SparkExecutionContext) ExecutionContextFactory.createContext(); @@ -231,6 +266,22 @@ public class FrameConverterTest extends AutomatedTestBase rddOut.saveAsTextFile(fnameOut); break; } + case TXTCELL2BIN: { + InputInfo iinfo = InputInfo.TextCellInputInfo; + OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo; + JavaPairRDD<LongWritable,Text> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, iinfo.inputKeyClass, iinfo.inputValueClass); + JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils + .textCellToBinaryBlock(sc, rddIn, mc, schema); + rddOut.saveAsHadoopFile(fnameOut, LongWritable.class, FrameBlock.class, oinfo.outputFormatClass); + break; + } + case BIN2TXTCELL: { + InputInfo iinfo = InputInfo.BinaryBlockInputInfo; + JavaPairRDD<LongWritable, FrameBlock> rddIn = sc.hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class); + JavaRDD<String> rddOut = FrameRDDConverterUtils.binaryBlockToStringRDD(rddIn, mc, "text"); + rddOut.saveAsTextFile(fnameOut); + break; + } } sec.close();
