Repository: incubator-systemml Updated Branches: refs/heads/master 3aa32e50c -> eb1b8fa69
[SYSTEMML-1311] New libsvm to binary-block spark rdd converter This patch adds a new libsvm to binary block data converter, which converts a libsvm file to binary block output files for features and labels. Internally, it uses MLUtils.loadLibSVMFile for parsing the libsvm file in order to ensure consistency with Spark. This converter also determines and writes the corresponding meta data files. On a 81M x 784 mnist libsvm input file (~110GB), this converter took 17min24s, compared to 30min35s of previously used experimental converters (libsvm-labeledpoints-binarycell-binaryblock). Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/eb1b8fa6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/eb1b8fa6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/eb1b8fa6 Branch: refs/heads/master Commit: eb1b8fa695a2f73ef8370b30e228a2b482854ae8 Parents: 3aa32e5 Author: Matthias Boehm <mboe...@gmail.com> Authored: Mon Mar 6 17:13:04 2017 -0800 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Mon Mar 6 17:13:12 2017 -0800 ---------------------------------------------------------------------- .../spark/utils/RDDConverterUtils.java | 176 +++++++++++++++++++ .../instructions/spark/utils/SparkUtils.java | 2 +- 2 files changed, 177 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eb1b8fa6/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java index d1e6793..902924a 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java @@ -27,6 +27,7 @@ import java.util.List; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -39,6 +40,7 @@ import org.apache.spark.ml.linalg.DenseVector; import org.apache.spark.ml.linalg.Vector; import org.apache.spark.ml.linalg.VectorUDT; import org.apache.spark.ml.linalg.Vectors; +import org.apache.spark.mllib.util.MLUtils; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; @@ -46,9 +48,11 @@ import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; +import org.apache.spark.storage.StorageLevel; import org.apache.spark.util.LongAccumulator; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.instructions.spark.data.SerLongWritable; import org.apache.sysml.runtime.instructions.spark.data.SerText; @@ -59,11 +63,13 @@ import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixCell; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; +import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.matrix.data.SparseBlock; import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue; import org.apache.sysml.runtime.matrix.mapred.ReblockBuffer; import org.apache.sysml.runtime.util.DataConverter; import org.apache.sysml.runtime.util.FastStringTokenizer; +import org.apache.sysml.runtime.util.MapReduceTool; import org.apache.sysml.runtime.util.UtilFunctions; import scala.Tuple2; @@ -297,6 +303,75 @@ public class RDDConverterUtils return binaryBlockToDataFrame(sparkSession, in, mc, toVector); } + /** + * Converts a libsvm text input file into two binary block matrices for features + * and labels, and saves these to the specified output files. This call also deletes + * existing files at the specified output locations, as well as determines and + * writes the meta data files of both output matrices. + * <p> + * Note: We use {@code org.apache.spark.mllib.util.MLUtils.loadLibSVMFile} for parsing + * the libsvm input files in order to ensure consistency with Spark. + * + * @param sc java spark context + * @param pathIn path to libsvm input file + * @param pathX path to binary block output file of features + * @param pathY path to binary block output file of labels + * @param mcOutX matrix characteristics of output matrix X + * @throws DMLRuntimeException if output path not writable or conversion failure + */ + public static void libsvmToBinaryBlock(JavaSparkContext sc, String pathIn, + String pathX, String pathY, MatrixCharacteristics mcOutX) + throws DMLRuntimeException + { + if( !mcOutX.dimsKnown() ) + throw new DMLRuntimeException("Matrix characteristics " + + "required to convert sparse input representation."); + try { + //cleanup existing output files + MapReduceTool.deleteFileIfExistOnHDFS(pathX); + MapReduceTool.deleteFileIfExistOnHDFS(pathY); + + //convert libsvm to labeled points + int numFeatures = (int) mcOutX.getCols(); + int numPartitions = SparkUtils.getNumPreferredPartitions(mcOutX, null); + JavaRDD<org.apache.spark.mllib.regression.LabeledPoint> lpoints = + MLUtils.loadLibSVMFile(sc.sc(), pathIn, numFeatures, numPartitions).toJavaRDD(); + + //append row index and best-effort caching to avoid repeated text parsing + JavaPairRDD<org.apache.spark.mllib.regression.LabeledPoint,Long> ilpoints = + lpoints.zipWithIndex().persist(StorageLevel.MEMORY_AND_DISK()); + + //extract labels and convert to binary block + MatrixCharacteristics mc1 = new MatrixCharacteristics(mcOutX.getRows(), 1, + mcOutX.getRowsPerBlock(), mcOutX.getColsPerBlock(), -1); + LongAccumulator aNnz1 = sc.sc().longAccumulator("nnz"); + JavaPairRDD<MatrixIndexes,MatrixBlock> out1 = ilpoints + .mapPartitionsToPair(new LabeledPointToBinaryBlockFunction(mc1, true, aNnz1)); + int numPartitions2 = SparkUtils.getNumPreferredPartitions(mc1, null); + out1 = RDDAggregateUtils.mergeByKey(out1, numPartitions2, false); + out1.saveAsHadoopFile(pathY, MatrixIndexes.class, MatrixBlock.class, SequenceFileOutputFormat.class); + mc1.setNonZeros(aNnz1.value()); //update nnz after triggered save + MapReduceTool.writeMetaDataFile(pathY+".mtd", ValueType.DOUBLE, mc1, OutputInfo.BinaryBlockOutputInfo); + + //extract data and convert to binary block + MatrixCharacteristics mc2 = new MatrixCharacteristics(mcOutX.getRows(), mcOutX.getCols(), + mcOutX.getRowsPerBlock(), mcOutX.getColsPerBlock(), -1); + LongAccumulator aNnz2 = sc.sc().longAccumulator("nnz"); + JavaPairRDD<MatrixIndexes,MatrixBlock> out2 = ilpoints + .mapPartitionsToPair(new LabeledPointToBinaryBlockFunction(mc2, false, aNnz2)); + out2 = RDDAggregateUtils.mergeByKey(out2, numPartitions, false); + out2.saveAsHadoopFile(pathX, MatrixIndexes.class, MatrixBlock.class, SequenceFileOutputFormat.class); + mc2.setNonZeros(aNnz2.value()); //update nnz after triggered save + MapReduceTool.writeMetaDataFile(pathX+".mtd", ValueType.DOUBLE, mc2, OutputInfo.BinaryBlockOutputInfo); + + //asynchronous cleanup of cached intermediates + ilpoints.unpersist(false); + } + catch(IOException ex) { + throw new DMLRuntimeException(ex); + } + } + public static JavaPairRDD<LongWritable, Text> stringToSerializableText(JavaPairRDD<Long,String> in) { return in.mapToPair(new TextToSerTextFunction()); @@ -696,6 +771,107 @@ public class RDDConverterUtils } } + private static class LabeledPointToBinaryBlockFunction implements PairFlatMapFunction<Iterator<Tuple2<org.apache.spark.mllib.regression.LabeledPoint,Long>>,MatrixIndexes,MatrixBlock> + { + private static final long serialVersionUID = 2290124693964816276L; + + private final long _rlen; + private final long _clen; + private final int _brlen; + private final int _bclen; + private final boolean _sparseX; + private final boolean _labels; + private final LongAccumulator _aNnz; + + public LabeledPointToBinaryBlockFunction(MatrixCharacteristics mc, boolean labels, LongAccumulator aNnz) { + _rlen = mc.getRows(); + _clen = mc.getCols(); + _brlen = mc.getRowsPerBlock(); + _bclen = mc.getColsPerBlock(); + _sparseX = MatrixBlock.evalSparseFormatInMemory( + mc.getRows(), mc.getCols(), mc.getNonZeros()); + _labels = labels; + _aNnz = aNnz; + } + + @Override + public Iterator<Tuple2<MatrixIndexes, MatrixBlock>> call(Iterator<Tuple2<org.apache.spark.mllib.regression.LabeledPoint,Long>> arg0) + throws Exception + { + ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret = new ArrayList<Tuple2<MatrixIndexes,MatrixBlock>>(); + + int ncblks = (int)Math.ceil((double)_clen/_bclen); + MatrixIndexes[] ix = new MatrixIndexes[ncblks]; + MatrixBlock[] mb = new MatrixBlock[ncblks]; + + while( arg0.hasNext() ) + { + Tuple2<org.apache.spark.mllib.regression.LabeledPoint,Long> tmp = arg0.next(); + org.apache.spark.mllib.regression.LabeledPoint row = tmp._1(); + long rowix = tmp._2() + 1; + + long rix = UtilFunctions.computeBlockIndex(rowix, _brlen); + int pos = UtilFunctions.computeCellInBlock(rowix, _brlen); + + //create new blocks for entire row + if( ix[0] == null || ix[0].getRowIndex() != rix ) { + if( ix[0] !=null ) + flushBlocksToList(ix, mb, ret); + long len = UtilFunctions.computeBlockSize(_rlen, rix, _brlen); + createBlocks(rowix, (int)len, ix, mb); + } + + //process row data + if( _labels ) { + double val = row.label(); + mb[0].appendValue(pos, 0, val); + _aNnz.add((val != 0) ? 1 : 0); + } + else { //features + for( int cix=1, pix=0; cix<=ncblks; cix++ ) { + int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen); + for( int j=0; j<lclen; j++ ) + mb[cix-1].appendValue(pos, j, row.features().apply(pix++)); + } + _aNnz.add(row.features().numNonzeros()); + } + } + + //flush last blocks + flushBlocksToList(ix, mb, ret); + + return ret.iterator(); + } + + // Creates new state of empty column blocks for current global row index. + private void createBlocks(long rowix, int lrlen, MatrixIndexes[] ix, MatrixBlock[] mb) + { + //compute row block index and number of column blocks + long rix = UtilFunctions.computeBlockIndex(rowix, _brlen); + int ncblks = (int)Math.ceil((double)_clen/_bclen); + + //create all column blocks (assume dense since csv is dense text format) + for( int cix=1; cix<=ncblks; cix++ ) { + int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen); + ix[cix-1] = new MatrixIndexes(rix, cix); + mb[cix-1] = new MatrixBlock(lrlen, lclen, _sparseX); + mb[cix-1].allocateDenseOrSparseBlock(); + } + } + + // Flushes current state of filled column blocks to output list. + private void flushBlocksToList( MatrixIndexes[] ix, MatrixBlock[] mb, ArrayList<Tuple2<MatrixIndexes,MatrixBlock>> ret ) + throws DMLRuntimeException + { + int len = ix.length; + for( int i=0; i<len; i++ ) + if( mb[i] != null ) { + ret.add(new Tuple2<MatrixIndexes,MatrixBlock>(ix[i],mb[i])); + mb[i].examSparsity(); //ensure right representation + } + } + } + private static class BinaryBlockToCSVFunction implements FlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>,String> { private static final long serialVersionUID = 1891768410987528573L; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eb1b8fa6/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java index 2fe3981..b9f54f2 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/SparkUtils.java @@ -117,7 +117,7 @@ public class SparkUtils } public static int getNumPreferredPartitions(MatrixCharacteristics mc, JavaPairRDD<?,?> in) { - if( !mc.dimsKnown(true) ) + if( !mc.dimsKnown(true) && in != null ) return in.getNumPartitions(); double hdfsBlockSize = InfrastructureAnalyzer.getHDFSBlockSize(); double matrixPSize = OptimizerUtils.estimatePartitionedSizeExactSparsity(mc);