Repository: incubator-systemml Updated Branches: refs/heads/master d5bb9cc2f -> 8a0df5b85
[SYSTEMML-566] Consolidated seq/par matrix writers (binary,text,mm,csv) Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/29ad1434 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/29ad1434 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/29ad1434 Branch: refs/heads/master Commit: 29ad1434ffdb5026f0c71f1526d42d489dd02f06 Parents: d5bb9cc Author: Matthias Boehm <[email protected]> Authored: Fri Jun 10 18:16:58 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Jun 10 19:23:16 2016 -0700 ---------------------------------------------------------------------- .../sysml/runtime/io/WriterBinaryBlock.java | 78 +++++---- .../runtime/io/WriterBinaryBlockParallel.java | 99 ++--------- .../sysml/runtime/io/WriterMatrixMarket.java | 76 ++++---- .../runtime/io/WriterMatrixMarketParallel.java | 110 ++---------- .../apache/sysml/runtime/io/WriterTextCSV.java | 49 ++++-- .../sysml/runtime/io/WriterTextCSVParallel.java | 175 ++----------------- .../apache/sysml/runtime/io/WriterTextCell.java | 54 +++--- .../runtime/io/WriterTextCellParallel.java | 89 ++-------- 8 files changed, 199 insertions(+), 531 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/29ad1434/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java index a3e0ebf..4ef1334 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java @@ -40,32 +40,36 @@ public class WriterBinaryBlock extends MatrixWriter { protected int _replication = -1; - public WriterBinaryBlock( int replication ) - { + public WriterBinaryBlock( int replication ) { _replication = replication; } @Override - public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz) + public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz) throws IOException, DMLRuntimeException { //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); + FileSystem fs = FileSystem.get(job); Path path = new Path( fname ); //if the file already exists on HDFS, remove it. MapReduceTool.deleteFileIfExistOnHDFS( fname ); - - //core write + + //set up preferred custom serialization framework for binary block format + if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION ) + MRJobConfiguration.addBinaryBlockSerializationFramework( job ); + + //core write sequential/parallel if( src.isDiag() ) - writeDiagBinaryBlockMatrixToHDFS(path, job, src, rlen, clen, brlen, bclen, _replication); + writeDiagBinaryBlockMatrixToHDFS(path, job, fs, src, rlen, clen, brlen, bclen); else - writeBinaryBlockMatrixToHDFS(path, job, src, rlen, clen, brlen, bclen, _replication); + writeBinaryBlockMatrixToHDFS(path, job, fs, src, rlen, clen, brlen, bclen); } @Override @SuppressWarnings("deprecation") - public void writeEmptyMatrixToHDFS(String fname, long rlen, long clen, int brlen, int bclen) + public final void writeEmptyMatrixToHDFS(String fname, long rlen, long clen, int brlen, int bclen) throws IOException, DMLRuntimeException { JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); @@ -94,25 +98,42 @@ public class WriterBinaryBlock extends MatrixWriter * @throws IOException * @throws DMLRuntimeException */ - @SuppressWarnings("deprecation") - protected void writeBinaryBlockMatrixToHDFS( Path path, JobConf job, MatrixBlock src, long rlen, long clen, int brlen, int bclen, int replication ) + protected void writeBinaryBlockMatrixToHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock src, long rlen, long clen, int brlen, int bclen ) throws IOException, DMLRuntimeException { + //sequential write + writeBinaryBlockMatrixToSequenceFile(path, job, fs, src, brlen, bclen, 0, (int)rlen); + } + + /** + * + * @param path + * @param job + * @param fs + * @param src + * @param brlen + * @param bclen + * @param rl + * @param ru + * @throws DMLRuntimeException + * @throws IOException + */ + @SuppressWarnings("deprecation") + protected final void writeBinaryBlockMatrixToSequenceFile( Path path, JobConf job, FileSystem fs, MatrixBlock src, int brlen, int bclen, int rl, int ru ) + throws DMLRuntimeException, IOException + { boolean sparse = src.isInSparseFormat(); - FileSystem fs = FileSystem.get(job); - - //set up preferred custom serialization framework for binary block format - if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION ) - MRJobConfiguration.addBinaryBlockSerializationFramework( job ); + int rlen = src.getNumRows(); + int clen = src.getNumColumns(); // 1) create sequence file writer, with right replication factor // (config via MRConfigurationNames.DFS_REPLICATION not possible since sequence file internally calls fs.getDefaultReplication()) SequenceFile.Writer writer = null; - if( replication > 0 ) //if replication specified (otherwise default) + if( _replication > 0 ) //if replication specified (otherwise default) { //copy of SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class), except for replication writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class, job.getInt(MRConfigurationNames.IO_FILE_BUFFER_SIZE, 4096), - (short)replication, fs.getDefaultBlockSize(), null, new SequenceFile.Metadata()); + (short)_replication, fs.getDefaultBlockSize(), null, new SequenceFile.Metadata()); } else { @@ -131,7 +152,7 @@ public class WriterBinaryBlock extends MatrixWriter //3) reblock and write MatrixIndexes indexes = new MatrixIndexes(); - if( rlen <= brlen && clen <= bclen ) //opt for single block + if( rlen <= brlen && clen <= bclen && rl == 0 ) //opt for single block { //directly write single block indexes.setIndexes(1, 1); @@ -143,7 +164,7 @@ public class WriterBinaryBlock extends MatrixWriter MatrixBlock[] blocks = createMatrixBlocksForReuse(rlen, clen, brlen, bclen, sparse, src.getNonZeros()); //create and write subblocks of matrix - for(int blockRow = 0; blockRow < (int)Math.ceil(src.getNumRows()/(double)brlen); blockRow++) + for(int blockRow = rl/brlen; blockRow < (int)Math.ceil(ru/(double)brlen); blockRow++) for(int blockCol = 0; blockCol < (int)Math.ceil(src.getNumColumns()/(double)bclen); blockCol++) { int maxRow = (blockRow*brlen + brlen < src.getNumRows()) ? brlen : src.getNumRows() - blockRow*brlen; @@ -168,8 +189,7 @@ public class WriterBinaryBlock extends MatrixWriter } } } - finally - { + finally { IOUtilFunctions.closeSilently(writer); } } @@ -188,24 +208,19 @@ public class WriterBinaryBlock extends MatrixWriter * @throws DMLRuntimeException */ @SuppressWarnings("deprecation") - protected void writeDiagBinaryBlockMatrixToHDFS( Path path, JobConf job, MatrixBlock src, long rlen, long clen, int brlen, int bclen, int replication ) + protected final void writeDiagBinaryBlockMatrixToHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock src, long rlen, long clen, int brlen, int bclen ) throws IOException, DMLRuntimeException { boolean sparse = src.isInSparseFormat(); - FileSystem fs = FileSystem.get(job); - - //set up preferred custom serialization framework for binary block format - if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION ) - MRJobConfiguration.addBinaryBlockSerializationFramework( job ); // 1) create sequence file writer, with right replication factor // (config via MRConfigurationNames.DFS_REPLICATION not possible since sequence file internally calls fs.getDefaultReplication()) SequenceFile.Writer writer = null; - if( replication > 0 ) //if replication specified (otherwise default) + if( _replication > 0 ) //if replication specified (otherwise default) { //copy of SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class), except for replication writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class, job.getInt(MRConfigurationNames.IO_FILE_BUFFER_SIZE, 4096), - (short)replication, fs.getDefaultBlockSize(), null, new SequenceFile.Metadata()); + (short)_replication, fs.getDefaultBlockSize(), null, new SequenceFile.Metadata()); } else { @@ -272,8 +287,7 @@ public class WriterBinaryBlock extends MatrixWriter } } } - finally - { + finally { IOUtilFunctions.closeSilently(writer); } } @@ -292,7 +306,7 @@ public class WriterBinaryBlock extends MatrixWriter * @throws DMLRuntimeException */ @SuppressWarnings("deprecation") - public void writePartitionedBinaryBlockMatrixToHDFS( Path path, JobConf job, MatrixBlock src, long rlen, long clen, int brlen, int bclen, PDataPartitionFormat pformat ) + public final void writePartitionedBinaryBlockMatrixToHDFS( Path path, JobConf job, MatrixBlock src, long rlen, long clen, int brlen, int bclen, PDataPartitionFormat pformat ) throws IOException, DMLRuntimeException { boolean sparse = src.isInSparseFormat(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/29ad1434/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java index b316f8b..16e01a3 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java @@ -29,44 +29,27 @@ import java.util.concurrent.Future; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.mapred.JobConf; import org.apache.sysml.conf.DMLConfig; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.matrix.data.MatrixIndexes; -import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; -import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; import org.apache.sysml.runtime.util.MapReduceTool; public class WriterBinaryBlockParallel extends WriterBinaryBlock { - public WriterBinaryBlockParallel( int replication ) - { + public WriterBinaryBlockParallel( int replication ) { super(replication); } - /** - * - * @param path - * @param job - * @param src - * @param rlen - * @param clen - * @param brlen - * @param bclen - * @throws IOException - * @throws DMLRuntimeException - */ @Override - protected void writeBinaryBlockMatrixToHDFS( Path path, JobConf job, MatrixBlock src, long rlen, long clen, int brlen, int bclen, int replication ) + protected void writeBinaryBlockMatrixToHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock src, long rlen, long clen, int brlen, int bclen ) throws IOException, DMLRuntimeException { //estimate output size and number of output blocks (min 1) - int numPartFiles = (int)(OptimizerUtils.estimatePartitionedSizeExactSparsity(rlen, clen, brlen, bclen, src.getNonZeros()) - / InfrastructureAnalyzer.getHDFSBlockSize()); + int numPartFiles = (int)(OptimizerUtils.estimatePartitionedSizeExactSparsity(rlen, clen, + brlen, bclen, src.getNonZeros()) / InfrastructureAnalyzer.getHDFSBlockSize()); numPartFiles = Math.max(numPartFiles, 1); //determine degree of parallelism @@ -75,17 +58,12 @@ public class WriterBinaryBlockParallel extends WriterBinaryBlock //fall back to sequential write if dop is 1 (e.g., <128MB) in order to create single file if( numThreads <= 1 ) { - super.writeBinaryBlockMatrixToHDFS(path, job, src, rlen, clen, brlen, bclen, replication); + super.writeBinaryBlockMatrixToHDFS(path, job, fs, src, rlen, clen, brlen, bclen); return; } - - //set up preferred custom serialization framework for binary block format - if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION ) - MRJobConfiguration.addBinaryBlockSerializationFramework( job ); //create directory for concurrent tasks MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); - FileSystem fs = FileSystem.get(job); //create and execute write tasks try @@ -95,7 +73,7 @@ public class WriterBinaryBlockParallel extends WriterBinaryBlock int blklen = (int)Math.ceil((double)rlen / brlen / numThreads) * brlen; for(int i=0; i<numThreads & i*blklen<rlen; i++) { Path newPath = new Path(path, String.format("0-m-%05d",i)); - tasks.add(new WriteFileTask(newPath, job, fs, src, i*blklen, Math.min((i+1)*blklen, rlen), brlen, bclen, _replication)); + tasks.add(new WriteFileTask(newPath, job, fs, src, i*blklen, Math.min((i+1)*blklen, rlen), brlen, bclen)); } //wait until all tasks have been executed @@ -114,7 +92,7 @@ public class WriterBinaryBlockParallel extends WriterBinaryBlock /** * */ - private static class WriteFileTask implements Callable<Object> + private class WriteFileTask implements Callable<Object> { private Path _path = null; private JobConf _job = null; @@ -124,10 +102,8 @@ public class WriterBinaryBlockParallel extends WriterBinaryBlock private long _ru = -1; private int _brlen = -1; private int _bclen = -1; - private int _replication = 1; - public WriteFileTask(Path path, JobConf job, FileSystem fs, MatrixBlock src, long rl, long ru, int brlen, int bclen, int rep) - { + public WriteFileTask(Path path, JobConf job, FileSystem fs, MatrixBlock src, long rl, long ru, int brlen, int bclen) { _path = path; _fs = fs; _job = job; @@ -136,66 +112,13 @@ public class WriterBinaryBlockParallel extends WriterBinaryBlock _ru = ru; _brlen = brlen; _bclen = bclen; - _replication = rep; } @Override - @SuppressWarnings("deprecation") - public Object call() throws Exception + public Object call() + throws Exception { - // 1) create sequence file writer, with right replication factor - // (config via MRConfigurationNames.DFS_REPLICATION not possible since sequence file internally calls fs.getDefaultReplication()) - SequenceFile.Writer writer = null; - if( _replication > 0 ) //if replication specified (otherwise default) - { - //copy of SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class), except for replication - writer = new SequenceFile.Writer(_fs, _job, _path, MatrixIndexes.class, MatrixBlock.class, _job.getInt(MRConfigurationNames.IO_FILE_BUFFER_SIZE, 4096), - (short)_replication, _fs.getDefaultBlockSize(), null, new SequenceFile.Metadata()); - } - else - { - writer = new SequenceFile.Writer(_fs, _job, _path, MatrixIndexes.class, MatrixBlock.class); - } - - try - { - //3) reblock and write - MatrixIndexes indexes = new MatrixIndexes(); - - //initialize blocks for reuse (at most 4 different blocks required) - MatrixBlock[] blocks = createMatrixBlocksForReuse(_src.getNumRows(), _src.getNumColumns(), - _brlen, _bclen, _src.isInSparseFormat(), _src.getNonZeros()); - - //create and write subblocks of matrix - for(int blockRow = (int)_rl/_brlen; blockRow < (int)Math.ceil(_ru/(double)_brlen); blockRow++) - for(int blockCol = 0; blockCol < (int)Math.ceil(_src.getNumColumns()/(double)_bclen); blockCol++) - { - int maxRow = (blockRow*_brlen + _brlen < _src.getNumRows()) ? _brlen : _src.getNumRows() - blockRow*_brlen; - int maxCol = (blockCol*_bclen + _bclen < _src.getNumColumns()) ? _bclen : _src.getNumColumns() - blockCol*_bclen; - - int row_offset = blockRow*_brlen; - int col_offset = blockCol*_bclen; - - //get reuse matrix block - MatrixBlock block = getMatrixBlockForReuse(blocks, maxRow, maxCol, _brlen, _bclen); - - //copy submatrix to block - _src.sliceOperations( row_offset, row_offset+maxRow-1, - col_offset, col_offset+maxCol-1, block ); - - //append block to sequence file - indexes.setIndexes(blockRow+1, blockCol+1); - writer.append(indexes, block); - - //reset block for later reuse - block.reset(); - } - } - finally - { - IOUtilFunctions.closeSilently(writer); - } - + writeBinaryBlockMatrixToSequenceFile(_path, _job, _fs, _src, _brlen, _bclen, (int)_rl, (int)_ru); return null; } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/29ad1434/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java index 42d504a..9ae359a 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java @@ -45,7 +45,7 @@ import org.apache.sysml.runtime.util.MapReduceTool; public class WriterMatrixMarket extends MatrixWriter { @Override - public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz) + public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz) throws IOException, DMLRuntimeException { //validity check matrix dimensions @@ -55,17 +55,18 @@ public class WriterMatrixMarket extends MatrixWriter //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); + FileSystem fs = FileSystem.get(job); Path path = new Path( fname ); //if the file already exists on HDFS, remove it. MapReduceTool.deleteFileIfExistOnHDFS( fname ); //core write - writeMatrixMarketMatrixToHDFS(path, job, src, rlen, clen, nnz); + writeMatrixMarketMatrixToHDFS(path, job, fs, src); } @Override - public void writeEmptyMatrixToHDFS(String fname, long rlen, long clen, int brlen, int bclen) + public final void writeEmptyMatrixToHDFS(String fname, long rlen, long clen, int brlen, int bclen) throws IOException, DMLRuntimeException { Path path = new Path( fname ); @@ -85,41 +86,51 @@ public class WriterMatrixMarket extends MatrixWriter * @param nnz * @throws IOException */ - protected void writeMatrixMarketMatrixToHDFS( Path path, JobConf job, MatrixBlock src, long rlen, long clen, long nnz ) + protected void writeMatrixMarketMatrixToHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock src ) + throws IOException + { + //sequential write + writeMatrixMarketMatrixToFile(path, job, fs, src, 0, src.getNumRows()); + } + + /** + * + * @param path + * @param job + * @param src + * @param rl + * @param ru + * @throws IOException + */ + protected final void writeMatrixMarketMatrixToFile( Path path, JobConf job, FileSystem fs, MatrixBlock src, int rl, int ru ) throws IOException { boolean sparse = src.isInSparseFormat(); - boolean entriesWritten = false; - FileSystem fs = FileSystem.get(job); - BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); - - int rows = src.getNumRows(); - int cols = src.getNumColumns(); - - //bound check per block - if( rows > rlen || cols > clen ) - { - throw new IOException("Matrix block [1:"+rows+",1:"+cols+"] " + - "out of overall matrix range [1:"+rlen+",1:"+clen+"]."); - } + int rlen = src.getNumRows(); + int clen = src.getNumColumns(); + long nnz = src.getNonZeros(); + BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); + try { //for obj reuse and preventing repeated buffer re-allocations StringBuilder sb = new StringBuilder(); - // First output MM header - sb.append ("%%MatrixMarket matrix coordinate real general\n"); - - // output number of rows, number of columns and number of nnz - sb.append (rlen + " " + clen + " " + nnz + "\n"); - br.write( sb.toString()); - sb.setLength(0); - + if( rl == 0 ) { + // First output MM header + sb.append ("%%MatrixMarket matrix coordinate real general\n"); + + // output number of rows, number of columns and number of nnz + sb.append (rlen + " " + clen + " " + nnz + "\n"); + br.write( sb.toString()); + sb.setLength(0); + } + // output matrix cell if( sparse ) //SPARSE { - Iterator<IJV> iter = src.getSparseBlockIterator(); + Iterator<IJV> iter = src.getSparseBlockIterator(rl, ru); while( iter.hasNext() ) { IJV cell = iter.next(); @@ -132,15 +143,14 @@ public class WriterMatrixMarket extends MatrixWriter sb.append('\n'); br.write( sb.toString() ); //same as append sb.setLength(0); - entriesWritten = true; } } else //DENSE { - for( int i=0; i<rows; i++ ) + for( int i=rl; i<ru; i++ ) { String rowIndex = Integer.toString(i+1); - for( int j=0; j<cols; j++ ) + for( int j=0; j<clen; j++ ) { double lvalue = src.getValueDenseUnsafe(i, j); if( lvalue != 0 ) //for nnz @@ -153,19 +163,17 @@ public class WriterMatrixMarket extends MatrixWriter sb.append('\n'); br.write( sb.toString() ); //same as append sb.setLength(0); - entriesWritten = true; } } } } //handle empty result - if ( !entriesWritten ) { + if ( src.isEmptyBlock(false) && rl==0 ) { br.write("1 1 0\n"); } } - finally - { + finally { IOUtilFunctions.closeSilently(br); } } @@ -179,7 +187,7 @@ public class WriterMatrixMarket extends MatrixWriter * @param nnz * @throws IOException */ - public void mergeTextcellToMatrixMarket( String srcFileName, String fileName, long rlen, long clen, long nnz ) + public final void mergeTextcellToMatrixMarket( String srcFileName, String fileName, long rlen, long clen, long nnz ) throws IOException { Configuration conf = new Configuration(ConfigurationManager.getCachedJobConf()); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/29ad1434/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java index 6e9a762..34d392e 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java @@ -19,11 +19,8 @@ package org.apache.sysml.runtime.io; -import java.io.BufferedWriter; import java.io.IOException; -import java.io.OutputStreamWriter; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -36,7 +33,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.sysml.conf.DMLConfig; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; -import org.apache.sysml.runtime.matrix.data.IJV; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.util.MapReduceTool; @@ -46,22 +42,15 @@ import org.apache.sysml.runtime.util.MapReduceTool; */ public class WriterMatrixMarketParallel extends WriterMatrixMarket { - /** - * - * @param fileName - * @param src - * @param rlen - * @param clen - * @param nnz - * @throws IOException - */ @Override - protected void writeMatrixMarketMatrixToHDFS( Path path, JobConf job, MatrixBlock src, long rlen, long clen, long nnz ) + protected void writeMatrixMarketMatrixToHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock src ) throws IOException { + int rlen = src.getNumRows(); + //estimate output size and number of output blocks (min 1) - int numPartFiles = (int)(OptimizerUtils.estimateSizeTextOutput(src.getNumRows(), src.getNumColumns(), src.getNonZeros(), - OutputInfo.MatrixMarketOutputInfo) / InfrastructureAnalyzer.getHDFSBlockSize()); + int numPartFiles = (int)(OptimizerUtils.estimateSizeTextOutput(src.getNumRows(), src.getNumColumns(), + src.getNonZeros(), OutputInfo.MatrixMarketOutputInfo) / InfrastructureAnalyzer.getHDFSBlockSize()); numPartFiles = Math.max(numPartFiles, 1); //determine degree of parallelism @@ -70,7 +59,7 @@ public class WriterMatrixMarketParallel extends WriterMatrixMarket //fall back to sequential write if dop is 1 (e.g., <128MB) in order to create single file if( numThreads <= 1 ) { - super.writeMatrixMarketMatrixToHDFS(path, job, src, rlen, clen, nnz); + super.writeMatrixMarketMatrixToHDFS(path, job, fs, src); return; } @@ -85,7 +74,7 @@ public class WriterMatrixMarketParallel extends WriterMatrixMarket int blklen = (int)Math.ceil((double)rlen / numThreads); for(int i=0; i<numThreads & i*blklen<rlen; i++) { Path newPath = new Path(path, String.format("0-m-%05d",i)); - tasks.add(new WriteMMTask(newPath, job, src, i*blklen, (int)Math.min((i+1)*blklen, rlen))); + tasks.add(new WriteMMTask(newPath, job, fs, src, i*blklen, (int)Math.min((i+1)*blklen, rlen))); } //wait until all tasks have been executed @@ -105,18 +94,19 @@ public class WriterMatrixMarketParallel extends WriterMatrixMarket * * */ - private static class WriteMMTask implements Callable<Object> + private class WriteMMTask implements Callable<Object> { private JobConf _job = null; + private FileSystem _fs = null; private MatrixBlock _src = null; private Path _path =null; private int _rl = -1; private int _ru = -1; - public WriteMMTask(Path path, JobConf job, MatrixBlock src, int rl, int ru) - { + public WriteMMTask(Path path, JobConf job, FileSystem fs, MatrixBlock src, int rl, int ru) { _path = path; _job = job; + _fs = fs; _src = src; _rl = rl; _ru = ru; @@ -125,83 +115,7 @@ public class WriterMatrixMarketParallel extends WriterMatrixMarket @Override public Object call() throws Exception { - boolean entriesWritten = false; - FileSystem fs = FileSystem.get(_job); - BufferedWriter bw = null; - - int rows = _src.getNumRows(); - int cols = _src.getNumColumns(); - long nnz = _src.getNonZeros(); - - try - { - //for obj reuse and preventing repeated buffer re-allocations - StringBuilder sb = new StringBuilder(); - bw = new BufferedWriter(new OutputStreamWriter(fs.create(_path,true))); - - if( _rl == 0 ) { - // First output MM header - sb.append ("%%MatrixMarket matrix coordinate real general\n"); - - // output number of rows, number of columns and number of nnz - sb.append (rows + " " + cols + " " + nnz + "\n"); - bw.write( sb.toString()); - sb.setLength(0); - } - - if( _src.isInSparseFormat() ) //SPARSE - { - Iterator<IJV> iter = _src.getSparseBlockIterator(_rl, _ru); - - while( iter.hasNext() ) - { - IJV cell = iter.next(); - - sb.append(cell.getI()+1); - sb.append(' '); - sb.append(cell.getJ()+1); - sb.append(' '); - sb.append(cell.getV()); - sb.append('\n'); - bw.write( sb.toString() ); - sb.setLength(0); - entriesWritten = true; - } - } - else //DENSE - { - for( int i=_rl; i<_ru; i++ ) - { - String rowIndex = Integer.toString(i+1); - for( int j=0; j<cols; j++ ) - { - double lvalue = _src.getValueDenseUnsafe(i, j); - if( lvalue != 0 ) //for nnz - { - sb.append(rowIndex); - sb.append(' '); - sb.append( j+1 ); - sb.append(' '); - sb.append( lvalue ); - sb.append('\n'); - bw.write( sb.toString() ); - sb.setLength(0); - entriesWritten = true; - } - } - } - } - - //handle empty result - if ( !entriesWritten ) { - bw.write("1 1 0\n"); - } - } - finally - { - IOUtilFunctions.closeSilently(bw); - } - + writeMatrixMarketMatrixToFile(_path, _job, _fs, _src, _rl, _ru); return null; } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/29ad1434/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java index f18a8ec..7e2ce90 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java @@ -58,7 +58,7 @@ public class WriterTextCSV extends MatrixWriter } @Override - public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz) + public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz) throws IOException, DMLRuntimeException { //validity check matrix dimensions @@ -68,24 +68,42 @@ public class WriterTextCSV extends MatrixWriter //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); + FileSystem fs = FileSystem.get(job); Path path = new Path( fname ); //if the file already exists on HDFS, remove it. MapReduceTool.deleteFileIfExistOnHDFS( fname ); - //core write - writeCSVMatrixToHDFS(path, job, src, rlen, clen, nnz, _props); + //core write (sequential/parallel) + writeCSVMatrixToHDFS(path, job, fs, src, _props); } @Override - public void writeEmptyMatrixToHDFS(String fname, long rlen, long clen, int brlen, int bclen) + public final void writeEmptyMatrixToHDFS(String fname, long rlen, long clen, int brlen, int bclen) throws IOException, DMLRuntimeException { JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); + FileSystem fs = FileSystem.get(job); Path path = new Path( fname ); MatrixBlock src = new MatrixBlock((int)rlen, 1, true); - writeCSVMatrixToHDFS(path, job, src, brlen, clen, 0, _props); + writeCSVMatrixToHDFS(path, job, fs, src, _props); + } + + /** + * + * @param path + * @param job + * @param fs + * @param src + * @param csvprops + * @throws IOException + */ + protected void writeCSVMatrixToHDFS(Path path, JobConf job, FileSystem fs, MatrixBlock src, CSVFileFormatProperties csvprops) + throws IOException + { + //sequential write csv file + writeCSVMatrixToFile(path, job, fs, src, 0, (int)src.getNumRows(), csvprops); } /** @@ -97,12 +115,14 @@ public class WriterTextCSV extends MatrixWriter * @param nnz * @throws IOException */ - protected void writeCSVMatrixToHDFS( Path path, JobConf job, MatrixBlock src, long rlen, long clen, long nnz, CSVFileFormatProperties props ) + protected final void writeCSVMatrixToFile( Path path, JobConf job, FileSystem fs, MatrixBlock src, int rl, int ru, CSVFileFormatProperties props ) throws IOException { boolean sparse = src.isInSparseFormat(); - FileSystem fs = FileSystem.get(job); - BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); + int clen = src.getNumColumns(); + + //create buffered writer + BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); try { @@ -114,7 +134,7 @@ public class WriterTextCSV extends MatrixWriter boolean csvsparse = props.isSparse(); // Write header line, if needed - if( props.hasHeader() ) + if( props.hasHeader() && rl==0 ) { //write row chunk-wise to prevent OOM on large number of columns for( int bj=0; bj<clen; bj+=BLOCKSIZE_J ) @@ -137,7 +157,7 @@ public class WriterTextCSV extends MatrixWriter if( sparse ) //SPARSE { SparseBlock sblock = src.getSparseBlock(); - for(int i=0; i < rlen; i++) + for(int i=rl; i < ru; i++) { //write row chunk-wise to prevent OOM on large number of columns int prev_jix = -1; @@ -204,7 +224,7 @@ public class WriterTextCSV extends MatrixWriter } else //DENSE { - for( int i=0; i<rlen; i++ ) + for( int i=rl; i<ru; i++ ) { //write row chunk-wise to prevent OOM on large number of columns for( int bj=0; bj<clen; bj+=BLOCKSIZE_J ) @@ -230,8 +250,7 @@ public class WriterTextCSV extends MatrixWriter } } } - finally - { + finally { IOUtilFunctions.closeSilently(br); } } @@ -251,7 +270,7 @@ public class WriterTextCSV extends MatrixWriter * @param clen * @throws IOException */ - public void mergeCSVPartFiles(String srcFileName, String destFileName, CSVFileFormatProperties csvprop, long rlen, long clen) + public final void mergeCSVPartFiles(String srcFileName, String destFileName, CSVFileFormatProperties csvprop, long rlen, long clen) throws IOException { Configuration conf = new Configuration(ConfigurationManager.getCachedJobConf()); @@ -331,7 +350,7 @@ public class WriterTextCSV extends MatrixWriter * @throws IOException */ @SuppressWarnings("unchecked") - public void addHeaderToCSV(String srcFileName, String destFileName, long rlen, long clen) + public final void addHeaderToCSV(String srcFileName, String destFileName, long rlen, long clen) throws IOException { Configuration conf = new Configuration(ConfigurationManager.getCachedJobConf()); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/29ad1434/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java index cd400fe..01ad579 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java @@ -19,9 +19,7 @@ package org.apache.sysml.runtime.io; -import java.io.BufferedWriter; import java.io.IOException; -import java.io.OutputStreamWriter; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; @@ -38,7 +36,6 @@ import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; -import org.apache.sysml.runtime.matrix.data.SparseBlock; import org.apache.sysml.runtime.util.MapReduceTool; /** @@ -50,22 +47,12 @@ public class WriterTextCSVParallel extends WriterTextCSV super( props ); } - /** - * - * @param fileName - * @param src - * @param rlen - * @param clen - * @param nnz - * @throws IOException - */ - @Override - protected void writeCSVMatrixToHDFS( Path path, JobConf job, MatrixBlock src, long rlen, long clen, long nnz, CSVFileFormatProperties props ) - throws IOException + protected void writeCSVMatrixToHDFS(Path path, JobConf job, FileSystem fs, MatrixBlock src, CSVFileFormatProperties csvprops) + throws IOException { //estimate output size and number of output blocks (min 1) - int numPartFiles = (int)(OptimizerUtils.estimateSizeTextOutput(src.getNumRows(), src.getNumColumns(), src.getNonZeros(), - OutputInfo.CSVOutputInfo) / InfrastructureAnalyzer.getHDFSBlockSize()); + int numPartFiles = (int)(OptimizerUtils.estimateSizeTextOutput(src.getNumRows(), src.getNumColumns(), + src.getNonZeros(), OutputInfo.CSVOutputInfo) / InfrastructureAnalyzer.getHDFSBlockSize()); numPartFiles = Math.max(numPartFiles, 1); //determine degree of parallelism @@ -74,7 +61,7 @@ public class WriterTextCSVParallel extends WriterTextCSV //fall back to sequential write if dop is 1 (e.g., <128MB) in order to create single file if( numThreads <= 1 ) { - super.writeCSVMatrixToHDFS(path, job, src, rlen, clen, nnz, props); + super.writeCSVMatrixToHDFS(path, job, fs, src, csvprops); return; } @@ -86,10 +73,11 @@ public class WriterTextCSVParallel extends WriterTextCSV { ExecutorService pool = Executors.newFixedThreadPool(numThreads); ArrayList<WriteCSVTask> tasks = new ArrayList<WriteCSVTask>(); + int rlen = src.getNumRows(); int blklen = (int)Math.ceil((double)rlen / numThreads); for(int i=0; i<numThreads & i*blklen<rlen; i++) { Path newPath = new Path(path, String.format("0-m-%05d",i)); - tasks.add(new WriteCSVTask(newPath, job, src, i*blklen, (int)Math.min((i+1)*blklen, rlen), props)); + tasks.add(new WriteCSVTask(newPath, job, fs, src, i*blklen, (int)Math.min((i+1)*blklen, rlen), csvprops)); } //wait until all tasks have been executed @@ -102,27 +90,28 @@ public class WriterTextCSVParallel extends WriterTextCSV } catch (Exception e) { throw new IOException("Failed parallel write of csv output.", e); - } - } + } + } /** * * */ - private static class WriteCSVTask implements Callable<Object> + private class WriteCSVTask implements Callable<Object> { private JobConf _job = null; + private FileSystem _fs = null; private MatrixBlock _src = null; private Path _path =null; private int _rl = -1; private int _ru = -1; private CSVFileFormatProperties _props = null; - public WriteCSVTask(Path path, JobConf job, MatrixBlock src, int rl, int ru, CSVFileFormatProperties props) - { + public WriteCSVTask(Path path, JobConf job, FileSystem fs, MatrixBlock src, int rl, int ru, CSVFileFormatProperties props) { _path = path; _job = job; + _fs = fs; _src = src; _rl = rl; _ru = ru; @@ -132,143 +121,7 @@ public class WriterTextCSVParallel extends WriterTextCSV @Override public Object call() throws Exception { - FileSystem _fs = FileSystem.get(_job); - BufferedWriter bw = null; - - boolean sparse = _src.isInSparseFormat(); - int cols = _src.getNumColumns(); - - try - { - //for obj reuse and preventing repeated buffer re-allocations - StringBuilder sb = new StringBuilder(); - bw = new BufferedWriter(new OutputStreamWriter(_fs.create(_path,true))); - - _props = (_props==null)? new CSVFileFormatProperties() : _props; - String delim = _props.getDelim(); //Pattern.quote(csvProperties.getDelim()); - boolean csvsparse = _props.isSparse(); - - // Write header line, if needed - if( _props.hasHeader() && _rl == 0 ) - { - //write row chunk-wise to prevent OOM on large number of columns - for( int bj=0; bj<cols; bj+=WriterTextCSV.BLOCKSIZE_J ) - { - for( int j=bj; j < Math.min(cols,bj+WriterTextCSV.BLOCKSIZE_J); j++) - { - sb.append("C"+ (j+1)); - if ( j < cols-1 ) - sb.append(delim); - } - bw.write( sb.toString() ); - sb.setLength(0); - } - sb.append('\n'); - bw.write( sb.toString() ); - sb.setLength(0); - } - - // Write data lines - if( sparse ) //SPARSE - { - SparseBlock sblock = _src.getSparseBlock(); - for( int i=_rl; i<_ru; i++ ) - { - //write row chunk-wise to prevent OOM on large number of columns - int prev_jix = -1; - if( sblock!=null && i<sblock.numRows() - && !sblock.isEmpty(i) ) - { - int pos = sblock.pos(i); - int alen = sblock.size(i); - int[] aix = sblock.indexes(i); - double[] avals = sblock.values(i); - - for(int j=pos; j<pos+alen; j++) - { - int jix = aix[j]; - - // output empty fields, if needed - for( int j2=prev_jix; j2<jix-1; j2++ ) { - if( !csvsparse ) - sb.append('0'); - sb.append(delim); - - //flush buffered string - if( j2%WriterTextCSV.BLOCKSIZE_J==0 ){ - bw.write( sb.toString() ); - sb.setLength(0); - } - } - - // output the value (non-zero) - sb.append( avals[j] ); - if( jix < cols-1) - sb.append(delim); - bw.write( sb.toString() ); - sb.setLength(0); - - //flush buffered string - if( jix%WriterTextCSV.BLOCKSIZE_J==0 ){ - bw.write( sb.toString() ); - sb.setLength(0); - } - - prev_jix = jix; - } - } - - // Output empty fields at the end of the row. - // In case of an empty row, output (clen-1) empty fields - for( int bj=prev_jix+1; bj<cols; bj+=WriterTextCSV.BLOCKSIZE_J ) - { - for( int j = bj; j < Math.min(cols,bj+WriterTextCSV.BLOCKSIZE_J); j++) { - if( !csvsparse ) - sb.append('0'); - if( j < cols-1 ) - sb.append(delim); - } - bw.write( sb.toString() ); - sb.setLength(0); - } - - sb.append('\n'); - bw.write( sb.toString() ); - sb.setLength(0); - } - } - else //DENSE - { - for( int i=_rl; i<_ru; i++ ) - { - //write row chunk-wise to prevent OOM on large number of columns - for( int bj=0; bj<cols; bj+=WriterTextCSV.BLOCKSIZE_J ) - { - for( int j=bj; j<Math.min(cols,bj+WriterTextCSV.BLOCKSIZE_J); j++ ) - { - double lvalue = _src.getValueDenseUnsafe(i, j); - if( lvalue != 0 ) //for nnz - sb.append(lvalue); - else if( !csvsparse ) - sb.append('0'); - - if( j != cols-1 ) - sb.append(delim); - } - bw.write( sb.toString() ); - sb.setLength(0); - } - - sb.append('\n'); - bw.write( sb.toString() ); //same as append - sb.setLength(0); - } - } - } - finally - { - IOUtilFunctions.closeSilently(bw); - } + writeCSVMatrixToFile(_path, _job, _fs, _src, _rl, _ru, _props); return null; } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/29ad1434/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java index dd421ed..e32172e 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java @@ -37,7 +37,7 @@ import org.apache.sysml.runtime.util.MapReduceTool; public class WriterTextCell extends MatrixWriter { @Override - public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz) + public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz) throws IOException, DMLRuntimeException { //validity check matrix dimensions @@ -47,17 +47,18 @@ public class WriterTextCell extends MatrixWriter //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); + FileSystem fs = FileSystem.get(job); Path path = new Path( fname ); //if the file already exists on HDFS, remove it. MapReduceTool.deleteFileIfExistOnHDFS( fname ); //core write - writeTextCellMatrixToHDFS(path, job, src, rlen, clen); + writeTextCellMatrixToHDFS(path, job, fs, src, rlen, clen); } @Override - public void writeEmptyMatrixToHDFS(String fname, long rlen, long clen, int brlen, int bclen) + public final void writeEmptyMatrixToHDFS(String fname, long rlen, long clen, int brlen, int bclen) throws IOException, DMLRuntimeException { Path path = new Path( fname ); @@ -79,24 +80,30 @@ public class WriterTextCell extends MatrixWriter * @param bclen * @throws IOException */ - protected void writeTextCellMatrixToHDFS( Path path, JobConf job, MatrixBlock src, long rlen, long clen ) + protected void writeTextCellMatrixToHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock src, long rlen, long clen ) + throws IOException + { + //sequential write text cell file + writeTextCellMatrixToFile(path, job, fs, src, 0, (int)rlen); + } + + /** + * + * @param path + * @param job + * @param src + * @param rl + * @param ru + * @throws IOException + */ + protected final void writeTextCellMatrixToFile( Path path, JobConf job, FileSystem fs, MatrixBlock src, int rl, int ru ) throws IOException { boolean sparse = src.isInSparseFormat(); - boolean entriesWritten = false; - FileSystem fs = FileSystem.get(job); - BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); + int clen = src.getNumColumns(); - int rows = src.getNumRows(); - int cols = src.getNumColumns(); + BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); - //bound check per block - if( rows > rlen || cols > clen ) - { - throw new IOException("Matrix block [1:"+rows+",1:"+cols+"] " + - "out of overall matrix range [1:"+rlen+",1:"+clen+"]."); - } - try { //for obj reuse and preventing repeated buffer re-allocations @@ -104,7 +111,7 @@ public class WriterTextCell extends MatrixWriter if( sparse ) //SPARSE { - Iterator<IJV> iter = src.getSparseBlockIterator(); + Iterator<IJV> iter = src.getSparseBlockIterator(rl, ru); while( iter.hasNext() ) { IJV cell = iter.next(); @@ -117,15 +124,14 @@ public class WriterTextCell extends MatrixWriter sb.append('\n'); br.write( sb.toString() ); //same as append sb.setLength(0); - entriesWritten = true; } } else //DENSE { - for( int i=0; i<rows; i++ ) + for( int i=rl; i<ru; i++ ) { String rowIndex = Integer.toString(i+1); - for( int j=0; j<cols; j++ ) + for( int j=0; j<clen; j++ ) { double lvalue = src.getValueDenseUnsafe(i, j); if( lvalue != 0 ) //for nnz @@ -138,7 +144,6 @@ public class WriterTextCell extends MatrixWriter sb.append('\n'); br.write( sb.toString() ); //same as append sb.setLength(0); - entriesWritten = true; } } @@ -146,13 +151,12 @@ public class WriterTextCell extends MatrixWriter } //handle empty result - if ( !entriesWritten ) { + if ( src.isEmptyBlock(false) && rl==0 ) { br.write("1 1 0\n"); } } - finally - { + finally { IOUtilFunctions.closeSilently(br); } - } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/29ad1434/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java index b6d3c33..b758435 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java @@ -19,11 +19,8 @@ package org.apache.sysml.runtime.io; -import java.io.BufferedWriter; import java.io.IOException; -import java.io.OutputStreamWriter; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -36,7 +33,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.sysml.conf.DMLConfig; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; -import org.apache.sysml.runtime.matrix.data.IJV; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.util.MapReduceTool; @@ -55,12 +51,12 @@ public class WriterTextCellParallel extends WriterTextCell * @throws IOException */ @Override - protected void writeTextCellMatrixToHDFS( Path path, JobConf job, MatrixBlock src, long rlen, long clen ) + protected void writeTextCellMatrixToHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock src, long rlen, long clen ) throws IOException { //estimate output size and number of output blocks (min 1) - int numPartFiles = (int)(OptimizerUtils.estimateSizeTextOutput(src.getNumRows(), src.getNumColumns(), src.getNonZeros(), - OutputInfo.TextCellOutputInfo) / InfrastructureAnalyzer.getHDFSBlockSize()); + int numPartFiles = (int)(OptimizerUtils.estimateSizeTextOutput(src.getNumRows(), src.getNumColumns(), + src.getNonZeros(), OutputInfo.TextCellOutputInfo) / InfrastructureAnalyzer.getHDFSBlockSize()); numPartFiles = Math.max(numPartFiles, 1); //determine degree of parallelism @@ -68,8 +64,8 @@ public class WriterTextCellParallel extends WriterTextCell numThreads = Math.min(numThreads, numPartFiles); //fall back to sequential write if dop is 1 (e.g., <128MB) in order to create single file - if( numThreads <= 1 ) { - super.writeTextCellMatrixToHDFS(path, job, src, rlen, clen); + if( numThreads <= 1 || src.getNonZeros()==0 ) { + super.writeTextCellMatrixToHDFS(path, job, fs, src, rlen, clen); return; } @@ -84,7 +80,7 @@ public class WriterTextCellParallel extends WriterTextCell int blklen = (int)Math.ceil((double)rlen / numThreads); for(int i=0; i<numThreads & i*blklen<rlen; i++) { Path newPath = new Path(path, String.format("0-m-%05d",i)); - tasks.add(new WriteTextTask(newPath, job, src, i*blklen, (int)Math.min((i+1)*blklen, rlen))); + tasks.add(new WriteTextTask(newPath, job, fs, src, i*blklen, (int)Math.min((i+1)*blklen, rlen))); } //wait until all tasks have been executed @@ -105,18 +101,19 @@ public class WriterTextCellParallel extends WriterTextCell * * */ - private static class WriteTextTask implements Callable<Object> + private class WriteTextTask implements Callable<Object> { private JobConf _job = null; + private FileSystem _fs = null; private MatrixBlock _src = null; private Path _path =null; private int _rl = -1; private int _ru = -1; - public WriteTextTask(Path path, JobConf job, MatrixBlock src, int rl, int ru) - { + public WriteTextTask(Path path, JobConf job, FileSystem fs, MatrixBlock src, int rl, int ru) { _path = path; _job = job; + _fs = fs; _src = src; _rl = rl; _ru = ru; @@ -125,71 +122,7 @@ public class WriterTextCellParallel extends WriterTextCell @Override public Object call() throws Exception { - boolean entriesWritten = false; - FileSystem fs = FileSystem.get(_job); - BufferedWriter bw = null; - int cols = _src.getNumColumns(); - - try - { - //for obj reuse and preventing repeated buffer re-allocations - StringBuilder sb = new StringBuilder(); - bw = new BufferedWriter(new OutputStreamWriter(fs.create(_path,true))); - - if( _src.isInSparseFormat() ) //SPARSE - { - Iterator<IJV> iter = _src.getSparseBlockIterator(_rl, _ru); - - while( iter.hasNext() ) - { - IJV cell = iter.next(); - - sb.append(cell.getI()+1); - sb.append(' '); - sb.append(cell.getJ()+1); - sb.append(' '); - sb.append(cell.getV()); - sb.append('\n'); - bw.write( sb.toString() ); - sb.setLength(0); - entriesWritten = true; - } - } - else //DENSE - { - for( int i=_rl; i<_ru; i++ ) - { - String rowIndex = Integer.toString(i+1); - for( int j=0; j<cols; j++ ) - { - double lvalue = _src.getValueDenseUnsafe(i, j); - if( lvalue != 0 ) //for nnz - { - sb.append(rowIndex); - sb.append(' '); - sb.append( j+1 ); - sb.append(' '); - sb.append( lvalue ); - sb.append('\n'); - bw.write( sb.toString() ); - sb.setLength(0); - entriesWritten = true; - } - - } - } - } - - //handle empty result - if ( !entriesWritten ) { - bw.write("1 1 0\n"); - } - } - finally - { - IOUtilFunctions.closeSilently(bw); - } - + writeTextCellMatrixToFile(_path, _job, _fs, _src, _rl, _ru); return null; } }
