Repository: incubator-systemml
Updated Branches:
  refs/heads/master 6bbae619a -> dea42de1f


[SYSTEMML-630] New parallel csv/textcell frame writers, cleanup

This patch adds, similar to the parallel binary block frame writer,
parallel frame writers for csv and textcell. On a 1Mx1k (8GB binary,
~25-30GB in csv/text) scenario, multi-threaded write achieved the
following improvements:

* Textcell: 588s (single-threaded) -> 59s (multi-threaded)
* CSV: 518s (single-threaded) -> 47s (multi-threaded) 


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/fdf55181
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/fdf55181
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/fdf55181

Branch: refs/heads/master
Commit: fdf551813572d6fd4f0d8076a525d4b1b3ab1b53
Parents: 6bbae61
Author: Matthias Boehm <[email protected]>
Authored: Tue Jun 7 13:31:15 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Tue Jun 7 13:31:15 2016 -0700

----------------------------------------------------------------------
 .../io/FrameWriterBinaryBlockParallel.java      |   7 +-
 .../sysml/runtime/io/FrameWriterFactory.java    |  10 +-
 .../sysml/runtime/io/FrameWriterTextCSV.java    |  62 ++++++---
 .../runtime/io/FrameWriterTextCSVParallel.java  | 133 +++++++++++++++++++
 .../sysml/runtime/io/FrameWriterTextCell.java   |  68 ++++++----
 .../runtime/io/FrameWriterTextCellParallel.java | 128 ++++++++++++++++++
 .../sysml/runtime/matrix/data/FrameBlock.java   |  44 +++++-
 7 files changed, 396 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/fdf55181/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java
index f0b5bd1..2ef7692 100644
--- 
a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java
+++ 
b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java
@@ -56,13 +56,12 @@ public class FrameWriterBinaryBlockParallel extends 
FrameWriterBinaryBlock
         * @throws DMLRuntimeException
         */
        protected void writeBinaryBlockFrameToHDFS( Path path, JobConf job, 
FrameBlock src, long rlen, long clen )
-                       throws IOException, DMLRuntimeException
+               throws IOException, DMLRuntimeException
        {
                //estimate output size and number of output blocks (min 1)
                int blen = ConfigurationManager.getBlocksize();
-               int numPartFiles = 
(int)(OptimizerUtils.estimatePartitionedSizeExactSparsity(rlen, clen, blen, 
blen, rlen*clen) 
-                                                  / 
InfrastructureAnalyzer.getHDFSBlockSize());
-               numPartFiles = Math.max(numPartFiles, 1);
+               int numPartFiles = 
Math.max((int)(OptimizerUtils.estimatePartitionedSizeExactSparsity(rlen, clen, 
blen, blen, rlen*clen) 
+                                                  / 
InfrastructureAnalyzer.getHDFSBlockSize()), 1);
                
                //determine degree of parallelism
                int numThreads = 
OptimizerUtils.getParallelBinaryWriteParallelism();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/fdf55181/src/main/java/org/apache/sysml/runtime/io/FrameWriterFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterFactory.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameWriterFactory.java
index a7f132f..c1021e6 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterFactory.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterFactory.java
@@ -57,12 +57,18 @@ public class FrameWriterFactory
                FrameWriter writer = null;
                
                if( oinfo == OutputInfo.TextCellOutputInfo ) {
-                       writer = new FrameWriterTextCell();
+                       if( 
ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_TEXTFORMATS)
 )
+                               writer = new FrameWriterTextCellParallel();
+                       else
+                               writer = new FrameWriterTextCell();
                }
                else if( oinfo == OutputInfo.CSVOutputInfo ) {
                        if( props!=null && !(props instanceof 
CSVFileFormatProperties) )
                                throw new DMLRuntimeException("Wrong type of 
file format properties for CSV writer.");
-                       writer = new 
FrameWriterTextCSV((CSVFileFormatProperties)props);
+                       if( 
ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_TEXTFORMATS)
 )
+                               writer = new 
FrameWriterTextCSVParallel((CSVFileFormatProperties)props);
+                       else
+                               writer = new 
FrameWriterTextCSV((CSVFileFormatProperties)props);        
                }
                else if( oinfo == OutputInfo.BinaryBlockOutputInfo ) {
                        if( 
ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS)
 )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/fdf55181/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
index 455f44d..addf798 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
@@ -34,6 +34,7 @@ import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.util.MapReduceTool;
 
 /**
+ * Single-threaded frame text csv writer.
  * 
  */
 public class FrameWriterTextCSV extends FrameWriter
@@ -58,22 +59,23 @@ public class FrameWriterTextCSV extends FrameWriter
         * @throws DMLRuntimeException  
         */
        @Override
-       public void writeFrameToHDFS(FrameBlock src, String fname, long rlen, 
long clen) 
+       public final void writeFrameToHDFS(FrameBlock src, String fname, long 
rlen, long clen) 
                throws IOException, DMLRuntimeException 
        {
-               //validity check frame dimensions
-               if( src.getNumRows() != rlen || src.getNumColumns() != clen ) {
-                       throw new IOException("Frame dimensions mismatch with 
metadata: "+src.getNumRows()+"x"+src.getNumColumns()+" vs "+rlen+"x"+clen+".");
-               }
-               
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
                Path path = new Path( fname );
 
                //if the file already exists on HDFS, remove it.
                MapReduceTool.deleteFileIfExistOnHDFS( fname );
-                       
-               //core write
+       
+               //validity check frame dimensions
+               if( src.getNumRows() != rlen || src.getNumColumns() != clen ) {
+                       throw new IOException("Frame dimensions mismatch with 
metadata: " + 
+                                       
src.getNumRows()+"x"+src.getNumColumns()+" vs "+rlen+"x"+clen+".");
+               }
+               
+               //core write (sequential/parallel)
                writeCSVFrameToHDFS(path, job, src, rlen, clen, _props);
        }
 
@@ -84,16 +86,36 @@ public class FrameWriterTextCSV extends FrameWriter
         * @param src
         * @param rlen
         * @param clen
+        * @param csvprops
+        * @throws IOException
+        */
+       protected void writeCSVFrameToHDFS( Path path, JobConf job, FrameBlock 
src, long rlen, long clen, CSVFileFormatProperties csvprops ) 
+               throws IOException
+       {
+               FileSystem fs = FileSystem.get(job);
+        
+               //sequential write to single text file
+               writeCSVFrameToFile(path, job, fs, src, 0, (int)rlen, 
csvprops);        
+       }
+       
+       /**
+        * 
+        * @param path
+        * @param job
+        * @param src
+        * @param rlen
+        * @param clen
         * @param props
         * @return
         * @throws IOException
         */
-       protected void writeCSVFrameToHDFS( Path path, JobConf job, FrameBlock 
src, long rlen, long clen, CSVFileFormatProperties props )
+       protected final void writeCSVFrameToFile( Path path, JobConf job, 
FileSystem fs, FrameBlock src, int rl, int ru, CSVFileFormatProperties props )
                throws IOException
        {
-               FileSystem fs = FileSystem.get(job);
-        BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));            
-               
+       //create buffered writer
+               BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));             
+       int cols = src.getNumColumns();
+       
                try
                {
                        //for obj reuse and preventing repeated buffer 
re-allocations
@@ -103,13 +125,13 @@ public class FrameWriterTextCSV extends FrameWriter
                        String delim = props.getDelim();
                        
                        // Write header line, if needed
-                       if( props.hasHeader() ) 
+                       if( props.hasHeader() && rl==0 ) 
                        {
                                //write row chunk-wise to prevent OOM on large 
number of columns
-                               for( int bj=0; bj<clen; bj+=BLOCKSIZE_J ) {
-                                       for( int j=bj; j < 
Math.min(clen,bj+BLOCKSIZE_J); j++) {
+                               for( int bj=0; bj<cols; bj+=BLOCKSIZE_J ) {
+                                       for( int j=bj; j < 
Math.min(cols,bj+BLOCKSIZE_J); j++) {
                                                sb.append("C"+ (j+1));
-                                               if ( j < clen-1 )
+                                               if ( j < cols-1 )
                                                        sb.append(delim);
                                        }
                                        br.write( sb.toString() );
@@ -121,15 +143,15 @@ public class FrameWriterTextCSV extends FrameWriter
                        }
                        
                        // Write data lines
-                       Iterator<String[]> iter = src.getStringRowIterator();
+                       Iterator<String[]> iter = src.getStringRowIterator(rl, 
ru);
                        while( iter.hasNext() ) {
                                //write row chunk-wise to prevent OOM on large 
number of columns
                                String[] row = iter.next();
-                               for( int bj=0; bj<clen; bj+=BLOCKSIZE_J ) {
-                                       for( int j=bj; 
j<Math.min(clen,bj+BLOCKSIZE_J); j++ ) {
+                               for( int bj=0; bj<cols; bj+=BLOCKSIZE_J ) {
+                                       for( int j=bj; 
j<Math.min(cols,bj+BLOCKSIZE_J); j++ ) {
                                                if(row[j] != null)
                                                        sb.append(row[j]);      
                                
-                                               if( j != clen-1 )
+                                               if( j != cols-1 )
                                                        sb.append(delim);
                                        }
                                        br.write( sb.toString() );

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/fdf55181/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java
new file mode 100644
index 0000000..de492e2
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sysml.conf.DMLConfig;
+import org.apache.sysml.hops.OptimizerUtils;
+import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.util.MapReduceTool;
+
+/**
+ * Single-threaded frame text csv writer.
+ * 
+ */
+public class FrameWriterTextCSVParallel extends FrameWriterTextCSV
+{
+       public FrameWriterTextCSVParallel( CSVFileFormatProperties props ) {
+               super(props);
+       }
+
+       /**
+        * 
+        * @param path
+        * @param job
+        * @param src
+        * @param rlen
+        * @param clen
+        * @param csvprops
+        * @throws IOException
+        */
+       @Override
+       protected void writeCSVFrameToHDFS( Path path, JobConf job, FrameBlock 
src, long rlen, long clen, CSVFileFormatProperties csvprops ) 
+               throws IOException
+       {
+               //estimate output size and number of output blocks (min 1)
+               int numPartFiles = 
Math.max((int)(OptimizerUtils.estimateSizeTextOutput(rlen, clen, rlen*clen, 
+                                             OutputInfo.CSVOutputInfo)  / 
InfrastructureAnalyzer.getHDFSBlockSize()), 1);
+               
+               //determine degree of parallelism
+               int numThreads = 
OptimizerUtils.getParallelTextWriteParallelism();
+               numThreads = Math.min(numThreads, numPartFiles);
+               
+               //fall back to sequential write if dop is 1 (e.g., <128MB) in 
order to create single file
+               if( numThreads <= 1 ) {
+                       super.writeCSVFrameToHDFS(path, job, src, rlen, clen, 
csvprops);
+                       return;
+               }
+               
+               //create directory for concurrent tasks
+               MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
+               FileSystem fs = FileSystem.get(job);
+               
+               //create and execute tasks
+               try 
+               {
+                       ExecutorService pool = 
Executors.newFixedThreadPool(numThreads);
+                       ArrayList<WriteFileTask> tasks = new 
ArrayList<WriteFileTask>();
+                       int blklen = (int)Math.ceil((double)rlen / numThreads);
+                       for(int i=0; i<numThreads & i*blklen<rlen; i++) {
+                               Path newPath = new Path(path, 
String.format("0-m-%05d",i));
+                               tasks.add(new WriteFileTask(newPath, job, fs, 
src, i*blklen, (int)Math.min((i+1)*blklen, rlen), csvprops));
+                       }
+
+                       //wait until all tasks have been executed
+                       List<Future<Object>> rt = pool.invokeAll(tasks);        
+                       pool.shutdown();
+                       
+                       //check for exceptions 
+                       for( Future<Object> task : rt )
+                               task.get();
+               } 
+               catch (Exception e) {
+                       throw new IOException("Failed parallel write of csv 
output.", e);
+               }
+       }
+       
+       private class WriteFileTask implements Callable<Object> 
+       {
+               private Path _path = null;
+               private JobConf _job = null;
+               private FileSystem _fs = null;
+               private FrameBlock _src = null;
+               private int _rl = -1;
+               private int _ru = -1;
+               private CSVFileFormatProperties _csvprops = null;
+               
+               public WriteFileTask(Path path, JobConf job, FileSystem fs, 
FrameBlock src, int rl, int ru, CSVFileFormatProperties csvprops) {
+                       _path = path;
+                       _fs = fs;
+                       _job = job;
+                       _src = src;
+                       _rl = rl;
+                       _ru = ru;
+                       _csvprops = csvprops;
+               }
+       
+               @Override
+               public Object call() throws Exception  {
+                       writeCSVFrameToFile(_path, _job, _fs, _src, _rl, _ru, 
_csvprops);
+                       return null;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/fdf55181/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
index 8064ce5..38348ad 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
@@ -32,6 +32,10 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.util.MapReduceTool;
 
+/**
+ * Single-threaded frame text cell writer.
+ * 
+ */
 public class FrameWriterTextCell extends FrameWriter
 {
        /**
@@ -42,22 +46,23 @@ public class FrameWriterTextCell extends FrameWriter
         * @throws DMLRuntimeException 
         */
        @Override
-       public void writeFrameToHDFS( FrameBlock src, String fname, long rlen, 
long clen )
+       public final void writeFrameToHDFS( FrameBlock src, String fname, long 
rlen, long clen )
                throws IOException, DMLRuntimeException 
        {
-               //validity check frame dimensions
-               if( src.getNumRows() != rlen || src.getNumColumns() != clen ) {
-                       throw new IOException("Frame dimensions mismatch with 
metadata: "+src.getNumRows()+"x"+src.getNumColumns()+" vs "+rlen+"x"+clen+".");
-               }
-                               
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
                Path path = new Path( fname );
 
                //if the file already exists on HDFS, remove it.
                MapReduceTool.deleteFileIfExistOnHDFS( fname );
-                       
-               //core write
+               
+               //validity check frame dimensions
+               if( src.getNumRows() != rlen || src.getNumColumns() != clen ) {
+                       throw new IOException("Frame dimensions mismatch with 
metadata: " + 
+                                       
src.getNumRows()+"x"+src.getNumColumns()+" vs "+rlen+"x"+clen+".");
+               }
+               
+               //core write (sequential/parallel)
                writeTextCellFrameToHDFS(path, job, src, src.getNumRows(), 
src.getNumColumns());
        }
 
@@ -68,38 +73,51 @@ public class FrameWriterTextCell extends FrameWriter
         * @param src
         * @param rlen
         * @param clen
-        * @param brlen
-        * @param bclen
         * @throws IOException
         */
        protected void writeTextCellFrameToHDFS( Path path, JobConf job, 
FrameBlock src, long rlen, long clen )
                throws IOException
        {
-               boolean entriesWritten = false;
                FileSystem fs = FileSystem.get(job);
-        BufferedWriter br = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));          
                
-       int rows = src.getNumRows();
-               int cols = src.getNumColumns();
+               //sequential write to single text file
+               writeTextCellFrameToFile(path, job, fs, src, 0, (int)rlen);     
+       }       
+       
+       /**
+        * Internal primitive to write a row range of a frame to a single text 
file, 
+        * which is used for both single- and multi-threaded writers (for 
consistency). 
+        *  
+        * @param path
+        * @param job
+        * @param fs
+        * @param src
+        * @param rl
+        * @param ru
+        * @throws IOException 
+        */
+       protected final void writeTextCellFrameToFile( Path path, JobConf job, 
FileSystem fs, FrameBlock src, int rl, int ru ) 
+               throws IOException
+       {
+               boolean entriesWritten = false;
+       int cols = src.getNumColumns();
+
+       //create buffered writer 
+       BufferedWriter br = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));           
 
-               //bound check per block
-               if( rows > rlen || cols > clen ) {
-                       throw new IOException("Frame block 
[1:"+rows+",1:"+cols+"] " +
-                                                     "out of overall frame 
range [1:"+rlen+",1:"+clen+"].");
-               }
-               
                try
                {
                        //for obj reuse and preventing repeated buffer 
re-allocations
                        StringBuilder sb = new StringBuilder();
                        
-                       Iterator<String[]> iter = src.getStringRowIterator();
-                       for( int i=0; iter.hasNext(); i++ ) { //for all rows
+                       //write frame row range to output
+                       Iterator<String[]> iter = src.getStringRowIterator(rl, 
ru);
+                       for( int i=rl; iter.hasNext(); i++ ) { //for all rows
                                String rowIndex = Integer.toString(i+1);
                                String[] row = iter.next();
                                for( int j=0; j<cols; j++ ) {
                                        if( row[j] != null ) {
-                                               sb.append(rowIndex);
+                                               sb.append( rowIndex );
                                                sb.append(' ');
                                                sb.append( j+1 );
                                                sb.append(' ');
@@ -119,6 +137,6 @@ public class FrameWriterTextCell extends FrameWriter
                }
                finally {
                        IOUtilFunctions.closeSilently(br);
-               }
-       }       
+               }               
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/fdf55181/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java
new file mode 100644
index 0000000..82a139d
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sysml.conf.DMLConfig;
+import org.apache.sysml.hops.OptimizerUtils;
+import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.util.MapReduceTool;
+
+/**
+ * Multi-threaded frame text cell writer.
+ * 
+ */
+public class FrameWriterTextCellParallel extends FrameWriterTextCell
+{
+       /**
+        * 
+        * @param path
+        * @param job
+        * @param src
+        * @param rlen
+        * @param clen
+        * @throws IOException
+        */
+       @Override
+       protected void writeTextCellFrameToHDFS( Path path, JobConf job, 
FrameBlock src, long rlen, long clen )
+               throws IOException
+       {
+               //estimate output size and number of output blocks (min 1)
+               int numPartFiles = 
Math.max((int)(OptimizerUtils.estimateSizeTextOutput(rlen, clen, rlen*clen, 
+                                             OutputInfo.TextCellOutputInfo)  / 
InfrastructureAnalyzer.getHDFSBlockSize()), 1);
+               
+               //determine degree of parallelism
+               int numThreads = 
OptimizerUtils.getParallelTextWriteParallelism();
+               numThreads = Math.min(numThreads, numPartFiles);
+               
+               //fall back to sequential write if dop is 1 (e.g., <128MB) in 
order to create single file
+               if( numThreads <= 1 ) {
+                       super.writeTextCellFrameToHDFS(path, job, src, rlen, 
clen);
+                       return;
+               }
+               
+               //create directory for concurrent tasks
+               MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
+               FileSystem fs = FileSystem.get(job);
+               
+               //create and execute tasks
+               try 
+               {
+                       ExecutorService pool = 
Executors.newFixedThreadPool(numThreads);
+                       ArrayList<WriteFileTask> tasks = new 
ArrayList<WriteFileTask>();
+                       int blklen = (int)Math.ceil((double)rlen / numThreads);
+                       for(int i=0; i<numThreads & i*blklen<rlen; i++) {
+                               Path newPath = new Path(path, 
String.format("0-m-%05d",i));
+                               tasks.add(new WriteFileTask(newPath, job, fs, 
src, i*blklen, (int)Math.min((i+1)*blklen, rlen)));
+                       }
+
+                       //wait until all tasks have been executed
+                       List<Future<Object>> rt = pool.invokeAll(tasks);        
+                       pool.shutdown();
+                       
+                       //check for exceptions 
+                       for( Future<Object> task : rt )
+                               task.get();
+               } 
+               catch (Exception e) {
+                       throw new IOException("Failed parallel write of text 
output.", e);
+               }
+       }       
+       
+       /**
+        * 
+        */
+       private class WriteFileTask implements Callable<Object> 
+       {
+               private Path _path = null;
+               private JobConf _job = null;
+               private FileSystem _fs = null;
+               private FrameBlock _src = null;
+               private int _rl = -1;
+               private int _ru = -1;
+               
+               public WriteFileTask(Path path, JobConf job, FileSystem fs, 
FrameBlock src, int rl, int ru) {
+                       _path = path;
+                       _fs = fs;
+                       _job = job;
+                       _src = src;
+                       _rl = rl;
+                       _ru = ru;
+               }
+       
+               @Override
+               public Object call() throws Exception  {
+                       writeTextCellFrameToFile(_path, _job, _fs, _src, _rl, 
_ru);
+                       return null;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/fdf55181/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 268ba44..b851704 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -348,7 +348,19 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
         * @return
         */
        public Iterator<String[]> getStringRowIterator() {
-               return new StringRowIterator();
+               return new StringRowIterator(0, _numRows);
+       }
+       
+       /**
+        * Get a row iterator over the frame where all fields are encoded
+        * as strings independent of their value types.  
+        * 
+        * @param rl
+        * @param ru
+        * @return
+        */
+       public Iterator<String[]> getStringRowIterator(int rl, int ru) {
+               return new StringRowIterator(rl, ru);
        }
        
        /**
@@ -358,7 +370,19 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
         * @return
         */
        public Iterator<Object[]> getObjectRowIterator() {
-               return new ObjectRowIterator();
+               return new ObjectRowIterator(0, _numRows);
+       }
+       
+       /**
+        * Get a row iterator over the frame where all fields are encoded
+        * as boxed objects according to their value types.  
+        * 
+        * @param rl
+        * @param ru
+        * @return
+        */
+       public Iterator<Object[]> getObjectRowIterator(int rl, int ru) {
+               return new ObjectRowIterator(rl, ru);
        }
 
        ///////
@@ -744,15 +768,17 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
        private abstract class RowIterator<T> implements Iterator<T[]> {
                protected T[] _curRow = null;
                protected int _curPos = -1;
+               protected int _maxPos = -1;
                
-               protected RowIterator() {
-                       _curPos = 0;
+               protected RowIterator(int rl, int ru) {
+                       _curPos = rl;
+                       _maxPos = ru;
                        _curRow = createRow(getNumColumns());
                }
                
                @Override
                public boolean hasNext() {
-                       return (_curPos < _numRows);
+                       return (_curPos < _maxPos);
                }
 
                @Override
@@ -767,6 +793,10 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
         * 
         */
        private class StringRowIterator extends RowIterator<String> {
+               public StringRowIterator(int rl, int ru) {
+                       super(rl, ru);
+               }
+               
                @Override
                protected String[] createRow(int size) {
                        return new String[size];
@@ -787,6 +817,10 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
         * 
         */
        private class ObjectRowIterator extends RowIterator<Object> {
+               public ObjectRowIterator(int rl, int ru) {
+                       super(rl, ru);
+               }
+               
                @Override
                protected Object[] createRow(int size) {
                        return new Object[size];

Reply via email to