Repository: incubator-systemml
Updated Branches:
  refs/heads/master d5bb9cc2f -> 8a0df5b85


[SYSTEMML-566] Consolidated seq/par matrix writers (binary,text,mm,csv)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/29ad1434
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/29ad1434
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/29ad1434

Branch: refs/heads/master
Commit: 29ad1434ffdb5026f0c71f1526d42d489dd02f06
Parents: d5bb9cc
Author: Matthias Boehm <[email protected]>
Authored: Fri Jun 10 18:16:58 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Fri Jun 10 19:23:16 2016 -0700

----------------------------------------------------------------------
 .../sysml/runtime/io/WriterBinaryBlock.java     |  78 +++++----
 .../runtime/io/WriterBinaryBlockParallel.java   |  99 ++---------
 .../sysml/runtime/io/WriterMatrixMarket.java    |  76 ++++----
 .../runtime/io/WriterMatrixMarketParallel.java  | 110 ++----------
 .../apache/sysml/runtime/io/WriterTextCSV.java  |  49 ++++--
 .../sysml/runtime/io/WriterTextCSVParallel.java | 175 ++-----------------
 .../apache/sysml/runtime/io/WriterTextCell.java |  54 +++---
 .../runtime/io/WriterTextCellParallel.java      |  89 ++--------
 8 files changed, 199 insertions(+), 531 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/29ad1434/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java
index a3e0ebf..4ef1334 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java
@@ -40,32 +40,36 @@ public class WriterBinaryBlock extends MatrixWriter
 {
        protected int _replication = -1;
        
-       public WriterBinaryBlock( int replication )
-       {
+       public WriterBinaryBlock( int replication ) {
                _replication  = replication;
        }
 
        @Override
-       public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, 
long clen, int brlen, int bclen, long nnz) 
+       public final void writeMatrixToHDFS(MatrixBlock src, String fname, long 
rlen, long clen, int brlen, int bclen, long nnz) 
                throws IOException, DMLRuntimeException 
        {
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
+               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fname );
 
                //if the file already exists on HDFS, remove it.
                MapReduceTool.deleteFileIfExistOnHDFS( fname );
-                       
-               //core write
+
+               //set up preferred custom serialization framework for binary 
block format
+               if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
+                       
MRJobConfiguration.addBinaryBlockSerializationFramework( job );
+               
+               //core write sequential/parallel
                if( src.isDiag() )
-                       writeDiagBinaryBlockMatrixToHDFS(path, job, src, rlen, 
clen, brlen, bclen, _replication);
+                       writeDiagBinaryBlockMatrixToHDFS(path, job, fs, src, 
rlen, clen, brlen, bclen);
                else
-                       writeBinaryBlockMatrixToHDFS(path, job, src, rlen, 
clen, brlen, bclen, _replication);
+                       writeBinaryBlockMatrixToHDFS(path, job, fs, src, rlen, 
clen, brlen, bclen);
        }
 
        @Override
        @SuppressWarnings("deprecation")
-       public void writeEmptyMatrixToHDFS(String fname, long rlen, long clen, 
int brlen, int bclen) 
+       public final void writeEmptyMatrixToHDFS(String fname, long rlen, long 
clen, int brlen, int bclen) 
                throws IOException, DMLRuntimeException 
        {
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
@@ -94,25 +98,42 @@ public class WriterBinaryBlock extends MatrixWriter
         * @throws IOException
         * @throws DMLRuntimeException 
         */
-       @SuppressWarnings("deprecation")
-       protected void writeBinaryBlockMatrixToHDFS( Path path, JobConf job, 
MatrixBlock src, long rlen, long clen, int brlen, int bclen, int replication )
+       protected void writeBinaryBlockMatrixToHDFS( Path path, JobConf job, 
FileSystem fs, MatrixBlock src, long rlen, long clen, int brlen, int bclen )
                throws IOException, DMLRuntimeException
        {
+               //sequential write 
+               writeBinaryBlockMatrixToSequenceFile(path, job, fs, src, brlen, 
bclen, 0, (int)rlen);
+       }
+       
+       /**
+        * 
+        * @param path
+        * @param job
+        * @param fs
+        * @param src
+        * @param brlen
+        * @param bclen
+        * @param rl
+        * @param ru
+        * @throws DMLRuntimeException
+        * @throws IOException
+        */
+       @SuppressWarnings("deprecation")
+       protected final void writeBinaryBlockMatrixToSequenceFile( Path path, 
JobConf job, FileSystem fs, MatrixBlock src, int brlen, int bclen, int rl, int 
ru ) 
+               throws DMLRuntimeException, IOException
+       {
                boolean sparse = src.isInSparseFormat();
-               FileSystem fs = FileSystem.get(job);
-               
-               //set up preferred custom serialization framework for binary 
block format
-               if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
-                       
MRJobConfiguration.addBinaryBlockSerializationFramework( job );
+               int rlen = src.getNumRows();
+               int clen = src.getNumColumns();
                
                // 1) create sequence file writer, with right replication 
factor 
                // (config via MRConfigurationNames.DFS_REPLICATION not 
possible since sequence file internally calls fs.getDefaultReplication())
                SequenceFile.Writer writer = null;
-               if( replication > 0 ) //if replication specified (otherwise 
default)
+               if( _replication > 0 ) //if replication specified (otherwise 
default)
                {
                        //copy of SequenceFile.Writer(fs, job, path, 
MatrixIndexes.class, MatrixBlock.class), except for replication
                        writer = new SequenceFile.Writer(fs, job, path, 
MatrixIndexes.class, MatrixBlock.class, 
job.getInt(MRConfigurationNames.IO_FILE_BUFFER_SIZE, 4096),
-                                                                
(short)replication, fs.getDefaultBlockSize(), null, new 
SequenceFile.Metadata());      
+                                                                
(short)_replication, fs.getDefaultBlockSize(), null, new 
SequenceFile.Metadata());     
                }
                else    
                {
@@ -131,7 +152,7 @@ public class WriterBinaryBlock extends MatrixWriter
                        //3) reblock and write
                        MatrixIndexes indexes = new MatrixIndexes();
 
-                       if( rlen <= brlen && clen <= bclen ) //opt for single 
block
+                       if( rlen <= brlen && clen <= bclen && rl == 0 ) //opt 
for single block
                        {
                                //directly write single block
                                indexes.setIndexes(1, 1);
@@ -143,7 +164,7 @@ public class WriterBinaryBlock extends MatrixWriter
                                MatrixBlock[] blocks = 
createMatrixBlocksForReuse(rlen, clen, brlen, bclen, sparse, 
src.getNonZeros());  
                                
                                //create and write subblocks of matrix
-                               for(int blockRow = 0; blockRow < 
(int)Math.ceil(src.getNumRows()/(double)brlen); blockRow++)
+                               for(int blockRow = rl/brlen; blockRow < 
(int)Math.ceil(ru/(double)brlen); blockRow++)
                                        for(int blockCol = 0; blockCol < 
(int)Math.ceil(src.getNumColumns()/(double)bclen); blockCol++)
                                        {
                                                int maxRow = (blockRow*brlen + 
brlen < src.getNumRows()) ? brlen : src.getNumRows() - blockRow*brlen;
@@ -168,8 +189,7 @@ public class WriterBinaryBlock extends MatrixWriter
                                        }
                        }
                }
-               finally
-               {
+               finally {
                        IOUtilFunctions.closeSilently(writer);
                }
        }
@@ -188,24 +208,19 @@ public class WriterBinaryBlock extends MatrixWriter
         * @throws DMLRuntimeException 
         */
        @SuppressWarnings("deprecation")
-       protected void writeDiagBinaryBlockMatrixToHDFS( Path path, JobConf 
job, MatrixBlock src, long rlen, long clen, int brlen, int bclen, int 
replication ) 
+       protected final void writeDiagBinaryBlockMatrixToHDFS( Path path, 
JobConf job, FileSystem fs, MatrixBlock src, long rlen, long clen, int brlen, 
int bclen ) 
                throws IOException, DMLRuntimeException
        {
                boolean sparse = src.isInSparseFormat();
-               FileSystem fs = FileSystem.get(job);
-               
-               //set up preferred custom serialization framework for binary 
block format
-               if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
-                       
MRJobConfiguration.addBinaryBlockSerializationFramework( job );
                
                // 1) create sequence file writer, with right replication 
factor 
                // (config via MRConfigurationNames.DFS_REPLICATION not 
possible since sequence file internally calls fs.getDefaultReplication())
                SequenceFile.Writer writer = null;
-               if( replication > 0 ) //if replication specified (otherwise 
default)
+               if( _replication > 0 ) //if replication specified (otherwise 
default)
                {
                        //copy of SequenceFile.Writer(fs, job, path, 
MatrixIndexes.class, MatrixBlock.class), except for replication
                        writer = new SequenceFile.Writer(fs, job, path, 
MatrixIndexes.class, MatrixBlock.class, 
job.getInt(MRConfigurationNames.IO_FILE_BUFFER_SIZE, 4096),
-                                                                
(short)replication, fs.getDefaultBlockSize(), null, new 
SequenceFile.Metadata());      
+                                                                
(short)_replication, fs.getDefaultBlockSize(), null, new 
SequenceFile.Metadata());     
                }
                else    
                {
@@ -272,8 +287,7 @@ public class WriterBinaryBlock extends MatrixWriter
                                        }
                        }                               
                }
-               finally
-               {
+               finally {
                        IOUtilFunctions.closeSilently(writer);
                }
        }
@@ -292,7 +306,7 @@ public class WriterBinaryBlock extends MatrixWriter
         * @throws DMLRuntimeException
         */
        @SuppressWarnings("deprecation")
-       public void writePartitionedBinaryBlockMatrixToHDFS( Path path, JobConf 
job, MatrixBlock src, long rlen, long clen, int brlen, int bclen, 
PDataPartitionFormat pformat )
+       public final void writePartitionedBinaryBlockMatrixToHDFS( Path path, 
JobConf job, MatrixBlock src, long rlen, long clen, int brlen, int bclen, 
PDataPartitionFormat pformat )
                        throws IOException, DMLRuntimeException
        {
                boolean sparse = src.isInSparseFormat();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/29ad1434/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java
index b316f8b..16e01a3 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java
@@ -29,44 +29,27 @@ import java.util.concurrent.Future;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysml.conf.DMLConfig;
 import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
-import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
 import org.apache.sysml.runtime.util.MapReduceTool;
 
 public class WriterBinaryBlockParallel extends WriterBinaryBlock
 {
-       public WriterBinaryBlockParallel( int replication )
-       {
+       public WriterBinaryBlockParallel( int replication ) {
                super(replication);
        }
        
-       /**
-        * 
-        * @param path
-        * @param job
-        * @param src
-        * @param rlen
-        * @param clen
-        * @param brlen
-        * @param bclen
-        * @throws IOException
-        * @throws DMLRuntimeException 
-        */
        @Override
-       protected void writeBinaryBlockMatrixToHDFS( Path path, JobConf job, 
MatrixBlock src, long rlen, long clen, int brlen, int bclen, int replication )
+       protected void writeBinaryBlockMatrixToHDFS( Path path, JobConf job, 
FileSystem fs, MatrixBlock src, long rlen, long clen, int brlen, int bclen )
                throws IOException, DMLRuntimeException
        {
                //estimate output size and number of output blocks (min 1)
-               int numPartFiles = 
(int)(OptimizerUtils.estimatePartitionedSizeExactSparsity(rlen, clen, brlen, 
bclen, src.getNonZeros()) 
-                                                  / 
InfrastructureAnalyzer.getHDFSBlockSize());
+               int numPartFiles = 
(int)(OptimizerUtils.estimatePartitionedSizeExactSparsity(rlen, clen, 
+                               brlen, bclen, src.getNonZeros()) / 
InfrastructureAnalyzer.getHDFSBlockSize());
                numPartFiles = Math.max(numPartFiles, 1);
                
                //determine degree of parallelism
@@ -75,17 +58,12 @@ public class WriterBinaryBlockParallel extends 
WriterBinaryBlock
                
                //fall back to sequential write if dop is 1 (e.g., <128MB) in 
order to create single file
                if( numThreads <= 1 ) {
-                       super.writeBinaryBlockMatrixToHDFS(path, job, src, 
rlen, clen, brlen, bclen, replication);
+                       super.writeBinaryBlockMatrixToHDFS(path, job, fs, src, 
rlen, clen, brlen, bclen);
                        return;
                }
-                       
-               //set up preferred custom serialization framework for binary 
block format
-               if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
-                       
MRJobConfiguration.addBinaryBlockSerializationFramework( job );
 
                //create directory for concurrent tasks
                MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
-               FileSystem fs = FileSystem.get(job);
                
                //create and execute write tasks
                try 
@@ -95,7 +73,7 @@ public class WriterBinaryBlockParallel extends 
WriterBinaryBlock
                        int blklen = (int)Math.ceil((double)rlen / brlen / 
numThreads) * brlen;
                        for(int i=0; i<numThreads & i*blklen<rlen; i++) {
                                Path newPath = new Path(path, 
String.format("0-m-%05d",i));
-                               tasks.add(new WriteFileTask(newPath, job, fs, 
src, i*blklen, Math.min((i+1)*blklen, rlen), brlen, bclen, _replication));
+                               tasks.add(new WriteFileTask(newPath, job, fs, 
src, i*blklen, Math.min((i+1)*blklen, rlen), brlen, bclen));
                        }
 
                        //wait until all tasks have been executed
@@ -114,7 +92,7 @@ public class WriterBinaryBlockParallel extends 
WriterBinaryBlock
        /**
         * 
         */
-       private static class WriteFileTask implements Callable<Object> 
+       private class WriteFileTask implements Callable<Object> 
        {
                private Path _path = null;
                private JobConf _job = null;
@@ -124,10 +102,8 @@ public class WriterBinaryBlockParallel extends 
WriterBinaryBlock
                private long _ru = -1;
                private int _brlen = -1;
                private int _bclen = -1;
-               private int _replication = 1;
                
-               public WriteFileTask(Path path, JobConf job, FileSystem fs, 
MatrixBlock src, long rl, long ru, int brlen, int bclen, int rep)
-               {
+               public WriteFileTask(Path path, JobConf job, FileSystem fs, 
MatrixBlock src, long rl, long ru, int brlen, int bclen) {
                        _path = path;
                        _fs = fs;
                        _job = job;
@@ -136,66 +112,13 @@ public class WriterBinaryBlockParallel extends 
WriterBinaryBlock
                        _ru = ru;
                        _brlen = brlen;
                        _bclen = bclen;
-                       _replication = rep;
                }
        
                @Override
-               @SuppressWarnings("deprecation")
-               public Object call() throws Exception 
+               public Object call() 
+                       throws Exception 
                {
-                       // 1) create sequence file writer, with right 
replication factor 
-                       // (config via MRConfigurationNames.DFS_REPLICATION not 
possible since sequence file internally calls fs.getDefaultReplication())
-                       SequenceFile.Writer writer = null;
-                       if( _replication > 0 ) //if replication specified 
(otherwise default)
-                       {
-                               //copy of SequenceFile.Writer(fs, job, path, 
MatrixIndexes.class, MatrixBlock.class), except for replication
-                               writer = new SequenceFile.Writer(_fs, _job, 
_path, MatrixIndexes.class, MatrixBlock.class, 
_job.getInt(MRConfigurationNames.IO_FILE_BUFFER_SIZE, 4096),
-                                                                        
(short)_replication, _fs.getDefaultBlockSize(), null, new 
SequenceFile.Metadata());
-                       }
-                       else    
-                       {
-                               writer = new SequenceFile.Writer(_fs, _job, 
_path, MatrixIndexes.class, MatrixBlock.class);
-                       }
-                       
-                       try
-                       {
-                               //3) reblock and write
-                               MatrixIndexes indexes = new MatrixIndexes();
-
-                               //initialize blocks for reuse (at most 4 
different blocks required)
-                               MatrixBlock[] blocks = 
createMatrixBlocksForReuse(_src.getNumRows(), _src.getNumColumns(),
-                                               _brlen, _bclen, 
_src.isInSparseFormat(), _src.getNonZeros());  
-                                       
-                               //create and write subblocks of matrix
-                               for(int blockRow = (int)_rl/_brlen; blockRow < 
(int)Math.ceil(_ru/(double)_brlen); blockRow++)
-                                       for(int blockCol = 0; blockCol < 
(int)Math.ceil(_src.getNumColumns()/(double)_bclen); blockCol++)
-                                       {
-                                               int maxRow = (blockRow*_brlen + 
_brlen < _src.getNumRows()) ? _brlen : _src.getNumRows() - blockRow*_brlen;
-                                               int maxCol = (blockCol*_bclen + 
_bclen < _src.getNumColumns()) ? _bclen : _src.getNumColumns() - 
blockCol*_bclen;
-                               
-                                               int row_offset = 
blockRow*_brlen;
-                                               int col_offset = 
blockCol*_bclen;
-                                               
-                                               //get reuse matrix block
-                                               MatrixBlock block = 
getMatrixBlockForReuse(blocks, maxRow, maxCol, _brlen, _bclen);
-       
-                                               //copy submatrix to block
-                                               _src.sliceOperations( 
row_offset, row_offset+maxRow-1, 
-                                                                            
col_offset, col_offset+maxCol-1, block );
-                                               
-                                               //append block to sequence file
-                                               indexes.setIndexes(blockRow+1, 
blockCol+1);
-                                               writer.append(indexes, block);
-                                                       
-                                               //reset block for later reuse
-                                               block.reset();
-                                       }
-                       }
-                       finally
-                       {
-                               IOUtilFunctions.closeSilently(writer);
-                       }       
-                       
+                       writeBinaryBlockMatrixToSequenceFile(_path, _job, _fs, 
_src, _brlen, _bclen, (int)_rl, (int)_ru);
                        return null;
                }
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/29ad1434/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java
index 42d504a..9ae359a 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java
@@ -45,7 +45,7 @@ import org.apache.sysml.runtime.util.MapReduceTool;
 public class WriterMatrixMarket extends MatrixWriter
 {
        @Override
-       public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, 
long clen, int brlen, int bclen, long nnz) 
+       public final void writeMatrixToHDFS(MatrixBlock src, String fname, long 
rlen, long clen, int brlen, int bclen, long nnz) 
                throws IOException, DMLRuntimeException 
        {
                //validity check matrix dimensions
@@ -55,17 +55,18 @@ public class WriterMatrixMarket extends MatrixWriter
                                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
+               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fname );
 
                //if the file already exists on HDFS, remove it.
                MapReduceTool.deleteFileIfExistOnHDFS( fname );
                        
                //core write
-               writeMatrixMarketMatrixToHDFS(path, job, src, rlen, clen, nnz);
+               writeMatrixMarketMatrixToHDFS(path, job, fs, src);
        }
 
        @Override
-       public void writeEmptyMatrixToHDFS(String fname, long rlen, long clen, 
int brlen, int bclen) 
+       public final void writeEmptyMatrixToHDFS(String fname, long rlen, long 
clen, int brlen, int bclen) 
                throws IOException, DMLRuntimeException 
        {
                Path path = new Path( fname );
@@ -85,41 +86,51 @@ public class WriterMatrixMarket extends MatrixWriter
         * @param nnz
         * @throws IOException
         */
-       protected void writeMatrixMarketMatrixToHDFS( Path path, JobConf job, 
MatrixBlock src, long rlen, long clen, long nnz )
+       protected void writeMatrixMarketMatrixToHDFS( Path path, JobConf job, 
FileSystem fs, MatrixBlock src )
+               throws IOException
+       {
+               //sequential write
+               writeMatrixMarketMatrixToFile(path, job, fs, src, 0, 
src.getNumRows());
+       }
+       
+       /**
+        * 
+        * @param path
+        * @param job
+        * @param src
+        * @param rl
+        * @param ru
+        * @throws IOException
+        */
+       protected final void writeMatrixMarketMatrixToFile( Path path, JobConf 
job, FileSystem fs, MatrixBlock src, int rl, int ru )
                throws IOException
        {
                boolean sparse = src.isInSparseFormat();
-               boolean entriesWritten = false;
-               FileSystem fs = FileSystem.get(job);
-        BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));            
-        
-       int rows = src.getNumRows();
-               int cols = src.getNumColumns();
-
-               //bound check per block
-               if( rows > rlen || cols > clen )
-               {
-                       throw new IOException("Matrix block 
[1:"+rows+",1:"+cols+"] " +
-                                                     "out of overall matrix 
range [1:"+rlen+",1:"+clen+"].");
-               }
+               int rlen = src.getNumRows();
+               int clen = src.getNumColumns();
+               long nnz = src.getNonZeros();
                
+               BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));             
+
                try
                {
                        //for obj reuse and preventing repeated buffer 
re-allocations
                        StringBuilder sb = new StringBuilder();
                        
-                       // First output MM header
-                       sb.append ("%%MatrixMarket matrix coordinate real 
general\n");
-               
-                       // output number of rows, number of columns and number 
of nnz
-                       sb.append (rlen + " " + clen + " " + nnz + "\n");
-            br.write( sb.toString());
-            sb.setLength(0);
-            
+                       if( rl == 0 ) {
+                               // First output MM header
+                               sb.append ("%%MatrixMarket matrix coordinate 
real general\n");
+                       
+                               // output number of rows, number of columns and 
number of nnz
+                               sb.append (rlen + " " + clen + " " + nnz + 
"\n");
+                               br.write( sb.toString());
+                               sb.setLength(0);
+                       }
+                        
             // output matrix cell
                        if( sparse ) //SPARSE
                        {                          
-                               Iterator<IJV> iter = 
src.getSparseBlockIterator();
+                               Iterator<IJV> iter = 
src.getSparseBlockIterator(rl, ru);
                                while( iter.hasNext() )
                                {
                                        IJV cell = iter.next();
@@ -132,15 +143,14 @@ public class WriterMatrixMarket extends MatrixWriter
                                        sb.append('\n');
                                        br.write( sb.toString() ); //same as 
append
                                        sb.setLength(0); 
-                                       entriesWritten = true;                  
                
                                }
                        }
                        else //DENSE
                        {
-                               for( int i=0; i<rows; i++ )
+                               for( int i=rl; i<ru; i++ )
                                {
                                        String rowIndex = 
Integer.toString(i+1);                                        
-                                       for( int j=0; j<cols; j++ )
+                                       for( int j=0; j<clen; j++ )
                                        {
                                                double lvalue = 
src.getValueDenseUnsafe(i, j);
                                                if( lvalue != 0 ) //for nnz
@@ -153,19 +163,17 @@ public class WriterMatrixMarket extends MatrixWriter
                                                        sb.append('\n');
                                                        br.write( sb.toString() 
); //same as append
                                                        sb.setLength(0); 
-                                                       entriesWritten = true;
                                                }
                                        }
                                }
                        }
        
                        //handle empty result
-                       if ( !entriesWritten ) {
+                       if ( src.isEmptyBlock(false) && rl==0 ) {
                                br.write("1 1 0\n");
                        }
                }
-               finally
-               {
+               finally {
                        IOUtilFunctions.closeSilently(br);
                }
        }
@@ -179,7 +187,7 @@ public class WriterMatrixMarket extends MatrixWriter
         * @param nnz
         * @throws IOException
         */
-       public void mergeTextcellToMatrixMarket( String srcFileName, String 
fileName, long rlen, long clen, long nnz )
+       public final void mergeTextcellToMatrixMarket( String srcFileName, 
String fileName, long rlen, long clen, long nnz )
                throws IOException
        {
                  Configuration conf = new 
Configuration(ConfigurationManager.getCachedJobConf());

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/29ad1434/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java
index 6e9a762..34d392e 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java
@@ -19,11 +19,8 @@
 
 package org.apache.sysml.runtime.io;
 
-import java.io.BufferedWriter;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -36,7 +33,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysml.conf.DMLConfig;
 import org.apache.sysml.hops.OptimizerUtils;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.util.MapReduceTool;
@@ -46,22 +42,15 @@ import org.apache.sysml.runtime.util.MapReduceTool;
  */
 public class WriterMatrixMarketParallel extends WriterMatrixMarket
 {
-       /**
-        * 
-        * @param fileName
-        * @param src
-        * @param rlen
-        * @param clen
-        * @param nnz
-        * @throws IOException
-        */
        @Override
-       protected void writeMatrixMarketMatrixToHDFS( Path path, JobConf job, 
MatrixBlock src, long rlen, long clen, long nnz )
+       protected void writeMatrixMarketMatrixToHDFS( Path path, JobConf job, 
FileSystem fs, MatrixBlock src )
                throws IOException
        {
+               int rlen = src.getNumRows();
+               
                //estimate output size and number of output blocks (min 1)
-               int numPartFiles = 
(int)(OptimizerUtils.estimateSizeTextOutput(src.getNumRows(), 
src.getNumColumns(), src.getNonZeros(), 
-                                             
OutputInfo.MatrixMarketOutputInfo)  / 
InfrastructureAnalyzer.getHDFSBlockSize());
+               int numPartFiles = 
(int)(OptimizerUtils.estimateSizeTextOutput(src.getNumRows(), 
src.getNumColumns(), 
+                               src.getNonZeros(), 
OutputInfo.MatrixMarketOutputInfo)  / 
InfrastructureAnalyzer.getHDFSBlockSize());
                numPartFiles = Math.max(numPartFiles, 1);
                
                //determine degree of parallelism
@@ -70,7 +59,7 @@ public class WriterMatrixMarketParallel extends 
WriterMatrixMarket
                
                //fall back to sequential write if dop is 1 (e.g., <128MB) in 
order to create single file
                if( numThreads <= 1 ) {
-                       super.writeMatrixMarketMatrixToHDFS(path, job, src, 
rlen, clen, nnz);
+                       super.writeMatrixMarketMatrixToHDFS(path, job, fs, src);
                        return;
                }
                
@@ -85,7 +74,7 @@ public class WriterMatrixMarketParallel extends 
WriterMatrixMarket
                        int blklen = (int)Math.ceil((double)rlen / numThreads);
                        for(int i=0; i<numThreads & i*blklen<rlen; i++) {
                                Path newPath = new Path(path, 
String.format("0-m-%05d",i));
-                               tasks.add(new WriteMMTask(newPath, job, src, 
i*blklen, (int)Math.min((i+1)*blklen, rlen)));
+                               tasks.add(new WriteMMTask(newPath, job, fs, 
src, i*blklen, (int)Math.min((i+1)*blklen, rlen)));
                        }
 
                        //wait until all tasks have been executed
@@ -105,18 +94,19 @@ public class WriterMatrixMarketParallel extends 
WriterMatrixMarket
         * 
         * 
         */
-       private static class WriteMMTask implements Callable<Object> 
+       private class WriteMMTask implements Callable<Object> 
        {
                private JobConf _job = null;
+               private FileSystem _fs = null;
                private MatrixBlock _src = null;
                private Path _path =null;
                private int _rl = -1;
                private int _ru = -1;
 
-               public WriteMMTask(Path path, JobConf job, MatrixBlock src, int 
rl, int ru)
-               {
+               public WriteMMTask(Path path, JobConf job, FileSystem fs, 
MatrixBlock src, int rl, int ru) {
                        _path = path;
                        _job = job;
+                       _fs = fs;
                        _src = src;
                        _rl = rl;
                        _ru = ru;
@@ -125,83 +115,7 @@ public class WriterMatrixMarketParallel extends 
WriterMatrixMarket
                @Override
                public Object call() throws Exception 
                {
-                       boolean entriesWritten = false;
-                       FileSystem fs = FileSystem.get(_job);
-                       BufferedWriter bw = null;
-                       
-                       int rows = _src.getNumRows();
-               int cols = _src.getNumColumns();
-               long nnz = _src.getNonZeros();
-               
-                       try
-                       {
-                               //for obj reuse and preventing repeated buffer 
re-allocations
-                               StringBuilder sb = new StringBuilder();
-                       bw = new BufferedWriter(new 
OutputStreamWriter(fs.create(_path,true)));
-                               
-                       if( _rl == 0 ) {
-                                       // First output MM header
-                                       sb.append ("%%MatrixMarket matrix 
coordinate real general\n");
-                               
-                                       // output number of rows, number of 
columns and number of nnz
-                                       sb.append (rows + " " + cols + " " + 
nnz + "\n");
-                           bw.write( sb.toString());
-                           sb.setLength(0);                        
-                       }
-                       
-                               if( _src.isInSparseFormat() ) //SPARSE
-                               {                          
-                                       Iterator<IJV> iter = 
_src.getSparseBlockIterator(_rl, _ru);
-
-                                       while( iter.hasNext() )
-                                       {
-                                               IJV cell = iter.next();
-
-                                               sb.append(cell.getI()+1);
-                                               sb.append(' ');
-                                               sb.append(cell.getJ()+1);
-                                               sb.append(' ');
-                                               sb.append(cell.getV());
-                                               sb.append('\n');
-                                               bw.write( sb.toString() );
-                                               sb.setLength(0); 
-                                               entriesWritten = true;
-                                       }
-                               }
-                               else //DENSE
-                               {
-                                       for( int i=_rl; i<_ru; i++ )
-                                       {
-                                               String rowIndex = 
Integer.toString(i+1);                                        
-                                               for( int j=0; j<cols; j++ )
-                                               {
-                                                       double lvalue = 
_src.getValueDenseUnsafe(i, j);
-                                                       if( lvalue != 0 ) //for 
nnz
-                                                       {
-                                                               
sb.append(rowIndex);
-                                                               sb.append(' ');
-                                                               sb.append( j+1 
);
-                                                               sb.append(' ');
-                                                               sb.append( 
lvalue );
-                                                               sb.append('\n');
-                                                               bw.write( 
sb.toString() );
-                                                               
sb.setLength(0); 
-                                                               entriesWritten 
= true;
-                                                       }
-                                               }
-                                       }
-                               }
-                               
-                               //handle empty result
-                               if ( !entriesWritten ) {
-                               bw.write("1 1 0\n");
-                               }
-                       }
-                       finally
-                       {
-                               IOUtilFunctions.closeSilently(bw);
-                       }
-                       
+                       writeMatrixMarketMatrixToFile(_path, _job, _fs, _src, 
_rl, _ru);
                        return null;
                }
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/29ad1434/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java
index f18a8ec..7e2ce90 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java
@@ -58,7 +58,7 @@ public class WriterTextCSV extends MatrixWriter
        }
        
        @Override
-       public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, 
long clen, int brlen, int bclen, long nnz) 
+       public final void writeMatrixToHDFS(MatrixBlock src, String fname, long 
rlen, long clen, int brlen, int bclen, long nnz) 
                throws IOException, DMLRuntimeException 
        {
                //validity check matrix dimensions
@@ -68,24 +68,42 @@ public class WriterTextCSV extends MatrixWriter
                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
+               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fname );
 
                //if the file already exists on HDFS, remove it.
                MapReduceTool.deleteFileIfExistOnHDFS( fname );
                        
-               //core write
-               writeCSVMatrixToHDFS(path, job, src, rlen, clen, nnz, _props);
+               //core write (sequential/parallel)
+               writeCSVMatrixToHDFS(path, job, fs, src, _props);
        }
 
        @Override
-       public void writeEmptyMatrixToHDFS(String fname, long rlen, long clen, 
int brlen, int bclen) 
+       public final void writeEmptyMatrixToHDFS(String fname, long rlen, long 
clen, int brlen, int bclen) 
                throws IOException, DMLRuntimeException 
        {
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
+               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fname );
 
                MatrixBlock src = new MatrixBlock((int)rlen, 1, true);
-               writeCSVMatrixToHDFS(path, job, src, brlen, clen, 0, _props);
+               writeCSVMatrixToHDFS(path, job, fs, src, _props);
+       }
+       
+       /**
+        * 
+        * @param path
+        * @param job
+        * @param fs
+        * @param src
+        * @param csvprops
+        * @throws IOException 
+        */
+       protected void writeCSVMatrixToHDFS(Path path, JobConf job, FileSystem 
fs, MatrixBlock src, CSVFileFormatProperties csvprops) 
+               throws IOException 
+       {
+               //sequential write csv file
+               writeCSVMatrixToFile(path, job, fs, src, 0, 
(int)src.getNumRows(), csvprops);
        }
        
        /**
@@ -97,12 +115,14 @@ public class WriterTextCSV extends MatrixWriter
         * @param nnz
         * @throws IOException
         */
-       protected void writeCSVMatrixToHDFS( Path path, JobConf job, 
MatrixBlock src, long rlen, long clen, long nnz, CSVFileFormatProperties props )
+       protected final void writeCSVMatrixToFile( Path path, JobConf job, 
FileSystem fs, MatrixBlock src, int rl, int ru, CSVFileFormatProperties props )
                throws IOException
        {
                boolean sparse = src.isInSparseFormat();
-               FileSystem fs = FileSystem.get(job);
-        BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));            
+               int clen = src.getNumColumns();
+               
+               //create buffered writer
+               BufferedWriter br = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));           
                
                try
                {
@@ -114,7 +134,7 @@ public class WriterTextCSV extends MatrixWriter
                        boolean csvsparse = props.isSparse();
                        
                        // Write header line, if needed
-                       if( props.hasHeader() ) 
+                       if( props.hasHeader() && rl==0 ) 
                        {
                                //write row chunk-wise to prevent OOM on large 
number of columns
                                for( int bj=0; bj<clen; bj+=BLOCKSIZE_J )
@@ -137,7 +157,7 @@ public class WriterTextCSV extends MatrixWriter
                        if( sparse ) //SPARSE
                        {       
                                SparseBlock sblock = src.getSparseBlock();
-                               for(int i=0; i < rlen; i++) 
+                               for(int i=rl; i < ru; i++) 
                    {
                                        //write row chunk-wise to prevent OOM 
on large number of columns
                                        int prev_jix = -1;
@@ -204,7 +224,7 @@ public class WriterTextCSV extends MatrixWriter
                        }
                        else //DENSE
                        {
-                               for( int i=0; i<rlen; i++ ) 
+                               for( int i=rl; i<ru; i++ ) 
                                {
                                        //write row chunk-wise to prevent OOM 
on large number of columns
                                        for( int bj=0; bj<clen; bj+=BLOCKSIZE_J 
)
@@ -230,8 +250,7 @@ public class WriterTextCSV extends MatrixWriter
                                }
                        }
                }
-               finally
-               {
+               finally {
                        IOUtilFunctions.closeSilently(br);
                }
        }
@@ -251,7 +270,7 @@ public class WriterTextCSV extends MatrixWriter
         * @param clen
         * @throws IOException
         */
-       public void mergeCSVPartFiles(String srcFileName, String destFileName, 
CSVFileFormatProperties csvprop, long rlen, long clen) 
+       public final void mergeCSVPartFiles(String srcFileName, String 
destFileName, CSVFileFormatProperties csvprop, long rlen, long clen) 
                throws IOException 
        {       
                Configuration conf = new 
Configuration(ConfigurationManager.getCachedJobConf());
@@ -331,7 +350,7 @@ public class WriterTextCSV extends MatrixWriter
         * @throws IOException
         */
        @SuppressWarnings("unchecked")
-       public void addHeaderToCSV(String srcFileName, String destFileName, 
long rlen, long clen) 
+       public final void addHeaderToCSV(String srcFileName, String 
destFileName, long rlen, long clen) 
                throws IOException 
        {
                Configuration conf = new 
Configuration(ConfigurationManager.getCachedJobConf());

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/29ad1434/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java
index cd400fe..01ad579 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java
@@ -19,9 +19,7 @@
 
 package org.apache.sysml.runtime.io;
 
-import java.io.BufferedWriter;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -38,7 +36,6 @@ import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
-import org.apache.sysml.runtime.matrix.data.SparseBlock;
 import org.apache.sysml.runtime.util.MapReduceTool;
 
 /**
@@ -50,22 +47,12 @@ public class WriterTextCSVParallel extends WriterTextCSV
                super( props );
        }
 
-       /**
-        * 
-        * @param fileName
-        * @param src
-        * @param rlen
-        * @param clen
-        * @param nnz
-        * @throws IOException
-        */
-       @Override
-       protected void writeCSVMatrixToHDFS( Path path, JobConf job, 
MatrixBlock src, long rlen, long clen, long nnz, CSVFileFormatProperties props )
-               throws IOException
+       protected void writeCSVMatrixToHDFS(Path path, JobConf job, FileSystem 
fs, MatrixBlock src, CSVFileFormatProperties csvprops) 
+               throws IOException 
        {
                //estimate output size and number of output blocks (min 1)
-               int numPartFiles = 
(int)(OptimizerUtils.estimateSizeTextOutput(src.getNumRows(), 
src.getNumColumns(), src.getNonZeros(), 
-                                             OutputInfo.CSVOutputInfo)  / 
InfrastructureAnalyzer.getHDFSBlockSize());
+               int numPartFiles = 
(int)(OptimizerUtils.estimateSizeTextOutput(src.getNumRows(), 
src.getNumColumns(), 
+                               src.getNonZeros(), OutputInfo.CSVOutputInfo)  / 
InfrastructureAnalyzer.getHDFSBlockSize());
                numPartFiles = Math.max(numPartFiles, 1);
                
                //determine degree of parallelism
@@ -74,7 +61,7 @@ public class WriterTextCSVParallel extends WriterTextCSV
        
                //fall back to sequential write if dop is 1 (e.g., <128MB) in 
order to create single file
                if( numThreads <= 1 ) {
-                       super.writeCSVMatrixToHDFS(path, job, src, rlen, clen, 
nnz, props);
+                       super.writeCSVMatrixToHDFS(path, job, fs, src, 
csvprops);
                        return;
                }
                
@@ -86,10 +73,11 @@ public class WriterTextCSVParallel extends WriterTextCSV
                {
                        ExecutorService pool = 
Executors.newFixedThreadPool(numThreads);
                        ArrayList<WriteCSVTask> tasks = new 
ArrayList<WriteCSVTask>();
+                       int rlen = src.getNumRows();
                        int blklen = (int)Math.ceil((double)rlen / numThreads);
                        for(int i=0; i<numThreads & i*blklen<rlen; i++) {
                                Path newPath = new Path(path, 
String.format("0-m-%05d",i));
-                               tasks.add(new WriteCSVTask(newPath, job, src, 
i*blklen, (int)Math.min((i+1)*blklen, rlen), props));
+                               tasks.add(new WriteCSVTask(newPath, job, fs, 
src, i*blklen, (int)Math.min((i+1)*blklen, rlen), csvprops));
                        }
 
                        //wait until all tasks have been executed
@@ -102,27 +90,28 @@ public class WriterTextCSVParallel extends WriterTextCSV
                } 
                catch (Exception e) {
                        throw new IOException("Failed parallel write of csv 
output.", e);
-               }
-       }
+               }               
+       }       
 
        
        /**
         * 
         * 
         */
-       private static class WriteCSVTask implements Callable<Object> 
+       private class WriteCSVTask implements Callable<Object> 
        {
                private JobConf _job = null;
+               private FileSystem _fs = null;
                private MatrixBlock _src = null;
                private Path _path =null;
                private int _rl = -1;
                private int _ru = -1;
                private CSVFileFormatProperties _props = null;
                
-               public WriteCSVTask(Path path, JobConf job, MatrixBlock src, 
int rl, int ru, CSVFileFormatProperties props)
-               {
+               public WriteCSVTask(Path path, JobConf job, FileSystem fs, 
MatrixBlock src, int rl, int ru, CSVFileFormatProperties props) {
                        _path = path;
                        _job = job;
+                       _fs = fs;
                        _src = src;
                        _rl = rl;
                        _ru = ru;
@@ -132,143 +121,7 @@ public class WriterTextCSVParallel extends WriterTextCSV
                @Override
                public Object call() throws Exception 
                {
-                       FileSystem _fs = FileSystem.get(_job);
-               BufferedWriter bw = null;
-               
-                       boolean sparse = _src.isInSparseFormat();
-                       int cols = _src.getNumColumns();
-
-                       try
-                       {
-                               //for obj reuse and preventing repeated buffer 
re-allocations
-                               StringBuilder sb = new StringBuilder();
-                               bw = new BufferedWriter(new 
OutputStreamWriter(_fs.create(_path,true)));
-                               
-                               _props = (_props==null)? new 
CSVFileFormatProperties() : _props;
-                               String delim = _props.getDelim(); 
//Pattern.quote(csvProperties.getDelim());
-                               boolean csvsparse = _props.isSparse();
-                               
-                               // Write header line, if needed
-                               if( _props.hasHeader() && _rl == 0 ) 
-                               {
-                                       //write row chunk-wise to prevent OOM 
on large number of columns
-                                       for( int bj=0; bj<cols; 
bj+=WriterTextCSV.BLOCKSIZE_J )
-                                       {
-                                               for( int j=bj; j < 
Math.min(cols,bj+WriterTextCSV.BLOCKSIZE_J); j++) 
-                                               {
-                                                       sb.append("C"+ (j+1));
-                                                       if ( j < cols-1 )
-                                                               
sb.append(delim);
-                                               }
-                                               bw.write( sb.toString() );
-                                   sb.setLength(0);    
-                                       }
-                                       sb.append('\n');
-                                       bw.write( sb.toString() );
-                           sb.setLength(0);
-                               }
-                               
-                               // Write data lines
-                               if( sparse ) //SPARSE
-                               {       
-                                       SparseBlock sblock = 
_src.getSparseBlock();
-                                       for( int i=_rl; i<_ru; i++ )
-                                       {
-                                               //write row chunk-wise to 
prevent OOM on large number of columns
-                                               int prev_jix = -1;
-                                               if(    sblock!=null && 
i<sblock.numRows() 
-                                                       && !sblock.isEmpty(i) )
-                                               {
-                                                       int pos = sblock.pos(i);
-                                                       int alen = 
sblock.size(i);
-                                                       int[] aix = 
sblock.indexes(i);
-                                                       double[] avals = 
sblock.values(i);
-                                                       
-                                                       for(int j=pos; 
j<pos+alen; j++) 
-                                                       {
-                                                               int jix = 
aix[j];
-                                                               
-                                                               // output empty 
fields, if needed
-                                                               for( int 
j2=prev_jix; j2<jix-1; j2++ ) {
-                                                                       if( 
!csvsparse )
-                                                                               
sb.append('0');
-                                                                       
sb.append(delim);
-                                                               
-                                                                       //flush 
buffered string
-                                                           if( 
j2%WriterTextCSV.BLOCKSIZE_J==0 ){
-                                                                               
bw.write( sb.toString() );
-                                                                   
sb.setLength(0);
-                                                           }
-                                                               }
-                                                               
-                                                               // output the 
value (non-zero)
-                                                               sb.append( 
avals[j] );
-                                                               if( jix < 
cols-1)
-                                                                       
sb.append(delim);
-                                                               bw.write( 
sb.toString() );
-                                                   sb.setLength(0);
-                                                   
-                                                   //flush buffered string
-                                                   if( 
jix%WriterTextCSV.BLOCKSIZE_J==0 ){
-                                                                       
bw.write( sb.toString() );
-                                                           sb.setLength(0);
-                                                   }
-                                                   
-                                                               prev_jix = jix;
-                                                       }
-                                               }
-                                               
-                                               // Output empty fields at the 
end of the row.
-                                               // In case of an empty row, 
output (clen-1) empty fields
-                                               for( int bj=prev_jix+1; 
bj<cols; bj+=WriterTextCSV.BLOCKSIZE_J )
-                                               {
-                                                       for( int j = bj; j < 
Math.min(cols,bj+WriterTextCSV.BLOCKSIZE_J); j++) {
-                                                               if( !csvsparse )
-                                                                       
sb.append('0');
-                                                               if( j < cols-1 )
-                                                                       
sb.append(delim);
-                                                       }
-                                                       bw.write( sb.toString() 
);
-                                           sb.setLength(0);    
-                                               }
-
-                                               sb.append('\n');
-                                               bw.write( sb.toString() ); 
-                                               sb.setLength(0); 
-                                       }
-                               }
-                               else //DENSE
-                               {
-                                       for( int i=_rl; i<_ru; i++ )
-                                       {
-                                               //write row chunk-wise to 
prevent OOM on large number of columns
-                                               for( int bj=0; bj<cols; 
bj+=WriterTextCSV.BLOCKSIZE_J )
-                                               {
-                                                       for( int j=bj; 
j<Math.min(cols,bj+WriterTextCSV.BLOCKSIZE_J); j++ )
-                                                       {
-                                                               double lvalue = 
_src.getValueDenseUnsafe(i, j);
-                                                               if( lvalue != 0 
) //for nnz
-                                                                       
sb.append(lvalue);
-                                                               else if( 
!csvsparse ) 
-                                                                       
sb.append('0');
-                                                               
-                                                               if( j != cols-1 
)
-                                                                       
sb.append(delim);
-                                                       }
-                                                       bw.write( sb.toString() 
);
-                                           sb.setLength(0);
-                                               }
-                                               
-                                               sb.append('\n');
-                                               bw.write( sb.toString() ); 
//same as append
-                                               sb.setLength(0); 
-                                       }
-                               }
-                       }
-                       finally
-                       {
-                               IOUtilFunctions.closeSilently(bw);
-                       }                       
+                       writeCSVMatrixToFile(_path, _job, _fs, _src, _rl, _ru, 
_props);
                        return null;
                }
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/29ad1434/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java
index dd421ed..e32172e 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java
@@ -37,7 +37,7 @@ import org.apache.sysml.runtime.util.MapReduceTool;
 public class WriterTextCell extends MatrixWriter
 {
        @Override
-       public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, 
long clen, int brlen, int bclen, long nnz) 
+       public final void writeMatrixToHDFS(MatrixBlock src, String fname, long 
rlen, long clen, int brlen, int bclen, long nnz) 
                throws IOException, DMLRuntimeException 
        {
                //validity check matrix dimensions
@@ -47,17 +47,18 @@ public class WriterTextCell extends MatrixWriter
                                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
+               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fname );
 
                //if the file already exists on HDFS, remove it.
                MapReduceTool.deleteFileIfExistOnHDFS( fname );
                        
                //core write
-               writeTextCellMatrixToHDFS(path, job, src, rlen, clen);
+               writeTextCellMatrixToHDFS(path, job, fs, src, rlen, clen);
        }
 
        @Override
-       public void writeEmptyMatrixToHDFS(String fname, long rlen, long clen, 
int brlen, int bclen) 
+       public final void writeEmptyMatrixToHDFS(String fname, long rlen, long 
clen, int brlen, int bclen) 
                throws IOException, DMLRuntimeException 
        {
                Path path = new Path( fname );
@@ -79,24 +80,30 @@ public class WriterTextCell extends MatrixWriter
         * @param bclen
         * @throws IOException
         */
-       protected void writeTextCellMatrixToHDFS( Path path, JobConf job, 
MatrixBlock src, long rlen, long clen )
+       protected void writeTextCellMatrixToHDFS( Path path, JobConf job, 
FileSystem fs, MatrixBlock src, long rlen, long clen )
+               throws IOException
+       {
+               //sequential write text cell file
+               writeTextCellMatrixToFile(path, job, fs, src, 0, (int)rlen);
+       }
+       
+       /**
+        * 
+        * @param path
+        * @param job
+        * @param src
+        * @param rl
+        * @param ru
+        * @throws IOException
+        */
+       protected final void writeTextCellMatrixToFile( Path path, JobConf job, 
FileSystem fs, MatrixBlock src, int rl, int ru )
                throws IOException
        {
                boolean sparse = src.isInSparseFormat();
-               boolean entriesWritten = false;
-               FileSystem fs = FileSystem.get(job);
-        BufferedWriter br = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));          
+               int clen = src.getNumColumns();
                
-       int rows = src.getNumRows();
-               int cols = src.getNumColumns();
+               BufferedWriter br = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));           
 
-               //bound check per block
-               if( rows > rlen || cols > clen )
-               {
-                       throw new IOException("Matrix block 
[1:"+rows+",1:"+cols+"] " +
-                                                     "out of overall matrix 
range [1:"+rlen+",1:"+clen+"].");
-               }
-               
                try
                {
                        //for obj reuse and preventing repeated buffer 
re-allocations
@@ -104,7 +111,7 @@ public class WriterTextCell extends MatrixWriter
                        
                        if( sparse ) //SPARSE
                        {                          
-                               Iterator<IJV> iter = 
src.getSparseBlockIterator();
+                               Iterator<IJV> iter = 
src.getSparseBlockIterator(rl, ru);
                                while( iter.hasNext() )
                                {
                                        IJV cell = iter.next();
@@ -117,15 +124,14 @@ public class WriterTextCell extends MatrixWriter
                                        sb.append('\n');
                                        br.write( sb.toString() ); //same as 
append
                                        sb.setLength(0); 
-                                       entriesWritten = true;                  
                
                                }
                        }
                        else //DENSE
                        {
-                               for( int i=0; i<rows; i++ )
+                               for( int i=rl; i<ru; i++ )
                                {
                                        String rowIndex = 
Integer.toString(i+1);                                        
-                                       for( int j=0; j<cols; j++ )
+                                       for( int j=0; j<clen; j++ )
                                        {
                                                double lvalue = 
src.getValueDenseUnsafe(i, j);
                                                if( lvalue != 0 ) //for nnz
@@ -138,7 +144,6 @@ public class WriterTextCell extends MatrixWriter
                                                        sb.append('\n');
                                                        br.write( sb.toString() 
); //same as append
                                                        sb.setLength(0); 
-                                                       entriesWritten = true;
                                                }
                                                
                                        }
@@ -146,13 +151,12 @@ public class WriterTextCell extends MatrixWriter
                        }
        
                        //handle empty result
-                       if ( !entriesWritten ) {
+                       if ( src.isEmptyBlock(false) && rl==0 ) {
                                br.write("1 1 0\n");
                        }
                }
-               finally
-               {
+               finally {
                        IOUtilFunctions.closeSilently(br);
                }
-       }       
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/29ad1434/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java
index b6d3c33..b758435 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java
@@ -19,11 +19,8 @@
 
 package org.apache.sysml.runtime.io;
 
-import java.io.BufferedWriter;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -36,7 +33,6 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysml.conf.DMLConfig;
 import org.apache.sysml.hops.OptimizerUtils;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.util.MapReduceTool;
@@ -55,12 +51,12 @@ public class WriterTextCellParallel extends WriterTextCell
         * @throws IOException
         */
        @Override
-       protected void writeTextCellMatrixToHDFS( Path path, JobConf job, 
MatrixBlock src, long rlen, long clen )
+       protected void writeTextCellMatrixToHDFS( Path path, JobConf job, 
FileSystem fs, MatrixBlock src, long rlen, long clen )
                throws IOException
        {
                //estimate output size and number of output blocks (min 1)
-               int numPartFiles = 
(int)(OptimizerUtils.estimateSizeTextOutput(src.getNumRows(), 
src.getNumColumns(), src.getNonZeros(), 
-                                             OutputInfo.TextCellOutputInfo)  / 
InfrastructureAnalyzer.getHDFSBlockSize());
+               int numPartFiles = 
(int)(OptimizerUtils.estimateSizeTextOutput(src.getNumRows(), 
src.getNumColumns(), 
+                               src.getNonZeros(), 
OutputInfo.TextCellOutputInfo)  / InfrastructureAnalyzer.getHDFSBlockSize());
                numPartFiles = Math.max(numPartFiles, 1);
                
                //determine degree of parallelism
@@ -68,8 +64,8 @@ public class WriterTextCellParallel extends WriterTextCell
                numThreads = Math.min(numThreads, numPartFiles);
                
                //fall back to sequential write if dop is 1 (e.g., <128MB) in 
order to create single file
-               if( numThreads <= 1 ) {
-                       super.writeTextCellMatrixToHDFS(path, job, src, rlen, 
clen);
+               if( numThreads <= 1 || src.getNonZeros()==0 ) {
+                       super.writeTextCellMatrixToHDFS(path, job, fs, src, 
rlen, clen);
                        return;
                }
                
@@ -84,7 +80,7 @@ public class WriterTextCellParallel extends WriterTextCell
                        int blklen = (int)Math.ceil((double)rlen / numThreads);
                        for(int i=0; i<numThreads & i*blklen<rlen; i++) {
                                Path newPath = new Path(path, 
String.format("0-m-%05d",i));
-                               tasks.add(new WriteTextTask(newPath, job, src, 
i*blklen, (int)Math.min((i+1)*blklen, rlen)));
+                               tasks.add(new WriteTextTask(newPath, job, fs, 
src, i*blklen, (int)Math.min((i+1)*blklen, rlen)));
                        }
 
                        //wait until all tasks have been executed
@@ -105,18 +101,19 @@ public class WriterTextCellParallel extends WriterTextCell
         * 
         * 
         */
-       private static class WriteTextTask implements Callable<Object> 
+       private class WriteTextTask implements Callable<Object> 
        {
                private JobConf _job = null;
+               private FileSystem _fs = null;
                private MatrixBlock _src = null;
                private Path _path =null;
                private int _rl = -1;
                private int _ru = -1;
                
-               public WriteTextTask(Path path, JobConf job, MatrixBlock src, 
int rl, int ru)
-               {
+               public WriteTextTask(Path path, JobConf job, FileSystem fs, 
MatrixBlock src, int rl, int ru) {
                        _path = path;
                        _job = job;
+                       _fs = fs;
                        _src = src;
                        _rl = rl;
                        _ru = ru;
@@ -125,71 +122,7 @@ public class WriterTextCellParallel extends WriterTextCell
                @Override
                public Object call() throws Exception 
                {
-                       boolean entriesWritten = false;
-                       FileSystem fs = FileSystem.get(_job);
-                       BufferedWriter bw = null;                       
-                       int cols = _src.getNumColumns();
-
-                       try
-                       {
-                               //for obj reuse and preventing repeated buffer 
re-allocations
-                               StringBuilder sb = new StringBuilder();
-                       bw = new BufferedWriter(new 
OutputStreamWriter(fs.create(_path,true)));
-                               
-                               if( _src.isInSparseFormat() ) //SPARSE
-                               {                          
-                                       Iterator<IJV> iter = 
_src.getSparseBlockIterator(_rl, _ru);
-                                       
-                                       while( iter.hasNext() )
-                                       {
-                                               IJV cell = iter.next();
-
-                                               sb.append(cell.getI()+1);
-                                               sb.append(' ');
-                                               sb.append(cell.getJ()+1);
-                                               sb.append(' ');
-                                               sb.append(cell.getV());
-                                               sb.append('\n');
-                                               bw.write( sb.toString() );
-                                               sb.setLength(0); 
-                                               entriesWritten = true;
-                                       }
-                               }
-                               else //DENSE
-                               {
-                                       for( int i=_rl; i<_ru; i++ )
-                                       {
-                                               String rowIndex = 
Integer.toString(i+1);                                        
-                                               for( int j=0; j<cols; j++ )
-                                               {
-                                                       double lvalue = 
_src.getValueDenseUnsafe(i, j);
-                                                       if( lvalue != 0 ) //for 
nnz
-                                                       {
-                                                               
sb.append(rowIndex);
-                                                               sb.append(' ');
-                                                               sb.append( j+1 
);
-                                                               sb.append(' ');
-                                                               sb.append( 
lvalue );
-                                                               sb.append('\n');
-                                                               bw.write( 
sb.toString() );
-                                                               
sb.setLength(0); 
-                                                               entriesWritten 
= true;
-                                                       }
-                                                       
-                                               }
-                                       }
-                               }
-                               
-                               //handle empty result
-                               if ( !entriesWritten ) {
-                               bw.write("1 1 0\n");
-                               }
-                       }
-                       finally
-                       {
-                               IOUtilFunctions.closeSilently(bw);
-                       }
-                       
+                       writeTextCellMatrixToFile(_path, _job, _fs, _src, _rl, 
_ru);
                        return null;
                }
        }

Reply via email to