[SYSTEMML-630] New parallel binary block frame reader, cleanup

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/2163bfb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/2163bfb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/2163bfb4

Branch: refs/heads/master
Commit: 2163bfb44ff26915adf18dfa4def3af8dcc6822e
Parents: fdf5518
Author: Matthias Boehm <[email protected]>
Authored: Tue Jun 7 17:59:01 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Tue Jun 7 17:59:01 2016 -0700

----------------------------------------------------------------------
 .../apache/sysml/runtime/io/FrameReader.java    |  25 +++++
 .../runtime/io/FrameReaderBinaryBlock.java      | 100 +++++++++--------
 .../io/FrameReaderBinaryBlockParallel.java      | 108 +++++++++++++++++++
 .../sysml/runtime/io/FrameReaderFactory.java    |  55 +++-------
 .../sysml/runtime/matrix/data/FrameBlock.java   |   4 +-
 5 files changed, 207 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2163bfb4/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
index a39d129..6a4b469 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
@@ -22,6 +22,7 @@ package org.apache.sysml.runtime.io;
 import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -165,6 +166,30 @@ public abstract class FrameReader
        
        /**
         * 
+        * @param schema
+        * @param ncol
+        * @return
+        */
+       protected static List<ValueType> createOutputSchema(List<ValueType> 
schema, long ncol) {
+               if( schema.size()==1 && ncol > 1 )
+                       return Collections.nCopies((int)ncol, schema.get(0));
+               return schema;
+       }
+       
+       /**
+        * 
+        * @param names
+        * @param ncol
+        * @return
+        */
+       protected static List<String> createOutputNames(List<String> names, 
long ncol) {
+               if( names.size()==1 && ncol > 1 )
+                       return FrameBlock.createColNames((int)ncol);
+               return names;
+       }
+       
+       /**
+        * 
         * @param fs
         * @param path
         * @throws IOException 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2163bfb4/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
index 534decf..f2a4a8a 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
@@ -33,7 +33,10 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 
 
-
+/**
+ * Single-threaded frame binary block reader.
+ * 
+ */
 public class FrameReaderBinaryBlock extends FrameReader
 {
        /**
@@ -48,12 +51,13 @@ public class FrameReaderBinaryBlock extends FrameReader
         * @throws IOException 
         */
        @Override
-       public FrameBlock readFrameFromHDFS(String fname, List<ValueType> 
schema, List<String> names,
-                       long rlen, long clen) 
-                       throws IOException, DMLRuntimeException 
+       public final FrameBlock readFrameFromHDFS(String fname, List<ValueType> 
schema, List<String> names, long rlen, long clen) 
+               throws IOException, DMLRuntimeException 
        {
                //allocate output frame block
-               FrameBlock ret = createOutputFrameBlock(schema, names, rlen);
+               List<ValueType> lschema = createOutputSchema(schema, clen);
+               List<String> lnames = createOutputNames(names, clen);
+               FrameBlock ret = createOutputFrameBlock(lschema, lnames, rlen);
                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
@@ -63,21 +67,13 @@ public class FrameReaderBinaryBlock extends FrameReader
                //check existence and non-empty file
                checkValidInputFile(fs, path); 
        
-               //core read 
+               //core read (sequential/parallel)
                readBinaryBlockFrameFromHDFS(path, job, fs, ret, rlen, clen);
                
                return ret;
        }
        
        /**
-        * Note: For efficiency, we directly use SequenceFile.Reader instead of 
SequenceFileInputFormat-
-        * InputSplits-RecordReader (SequenceFileRecordReader). First, this has 
no drawbacks since the
-        * SequenceFileRecordReader internally uses SequenceFile.Reader as 
well. Second, it is 
-        * advantageous if the actual sequence files are larger than the file 
splits created by   
-        * informat.getSplits (which is usually aligned to the HDFS block size) 
because then there is 
-        * overhead for finding the actual split between our 1k-1k blocks. This 
case happens
-        * if the read frame was create by CP or when jobs directly write to 
large output files 
-        * (e.g., parfor matrix partitioning).
         * 
         * @param path
         * @param job
@@ -89,46 +85,62 @@ public class FrameReaderBinaryBlock extends FrameReader
         * @throws IOException
         * @throws DMLRuntimeException 
         */
-       @SuppressWarnings("deprecation")
-       private static void readBinaryBlockFrameFromHDFS( Path path, JobConf 
job, FileSystem fs, FrameBlock dest, long rlen, long clen )
+       protected void readBinaryBlockFrameFromHDFS( Path path, JobConf job, 
FileSystem fs, FrameBlock dest, long rlen, long clen )
+               throws IOException, DMLRuntimeException
+       {
+               //sequential read from sequence files
+               for( Path lpath : getSequenceFilePaths(fs, path) ) //1..N files 
+                       readBinaryBlockFrameFromSequenceFile(lpath, job, fs, 
dest);
+       }
+       
+       /**
+        * 
+        * @param path
+        * @param job
+        * @param fs
+        * @param dest
+        * @throws IOException
+        * @throws DMLRuntimeException
+        */
+       @SuppressWarnings({ "deprecation", "resource" })
+       protected final void readBinaryBlockFrameFromSequenceFile( Path path, 
JobConf job, FileSystem fs, FrameBlock dest )
                throws IOException, DMLRuntimeException
        {
+               int rlen = dest.getNumRows();
+               int clen = dest.getNumColumns();
+               
+               //directly read from sequence files (individual partfiles)
+               SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,path,job);
                LongWritable key = new LongWritable(-1L);
                FrameBlock value = new FrameBlock();
                
-               for( Path lpath : getSequenceFilePaths(fs, path) ) //1..N files 
+               try
                {
-                       //directly read from sequence files (individual 
partfiles)
-                       SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,job);
-                       
-                       try
-                       {
-                               //note: next(key, value) does not yet exploit 
the given serialization classes, record reader does but is generally slower.
-                               while( reader.next(key, value) ) {      
-                                       int row_offset = (int)(key.get()-1);
-                                       
-                                       int rows = value.getNumRows();
-                                       int cols = value.getNumColumns();
+                       //note: next(key, value) does not yet exploit the given 
serialization classes, record reader does but is generally slower.
+                       while( reader.next(key, value) ) {      
+                               int row_offset = (int)(key.get()-1);
+                               
+                               int rows = value.getNumRows();
+                               int cols = value.getNumColumns();
 
-                                       if(rows == 0 || cols == 0)      //Empty 
block, ignore it.
-                                               continue;
-                                       
-                                       //bound check per block
-                                       if( row_offset + rows < 0 || row_offset 
+ rows > rlen ) {
-                                               throw new IOException("Frame 
block ["+(row_offset+1)+":"+(row_offset+rows)+","+":"+"] " +
-                                                                             
"out of overall frame range [1:"+rlen+",1:"+clen+"].");
-                                       }
-                       
-                                       dest.copy( row_offset, 
row_offset+rows-1, 
-                                                       0, cols-1, value);
+                               if(rows == 0 || cols == 0)      //Empty block, 
ignore it.
+                                       continue;
+                               
+                               //bound check per block
+                               if( row_offset + rows < 0 || row_offset + rows 
> rlen ) {
+                                       throw new IOException("Frame block 
["+(row_offset+1)+":"+(row_offset+rows)+","+":"+"] " +
+                                                                     "out of 
overall frame range [1:"+rlen+",1:"+clen+"].");
                                }
-                       }
-                       finally {
-                               IOUtilFunctions.closeSilently(reader);
+               
+                               dest.copy( row_offset, row_offset+rows-1, 
+                                               0, cols-1, value);
                        }
                }
-       }
-
+               finally {
+                       IOUtilFunctions.closeSilently(reader);
+               }
+       }       
+       
        /**
         * Specific functionality of FrameReaderBinaryBlock, mostly used for 
testing.
         * 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2163bfb4/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
new file mode 100644
index 0000000..d684af0
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+
+
+/**
+ * Multi-threaded frame binary block reader.
+ * 
+ */
+public class FrameReaderBinaryBlockParallel extends FrameReaderBinaryBlock
+{
+       /**
+        * 
+        * @param path
+        * @param job
+        * @param fs 
+        * @param dest
+        * @param rlen
+        * @param clen
+        * 
+        * @throws IOException
+        * @throws DMLRuntimeException 
+        */
+       protected void readBinaryBlockFrameFromHDFS( Path path, JobConf job, 
FileSystem fs, FrameBlock dest, long rlen, long clen )
+               throws IOException, DMLRuntimeException
+       {
+               int numThreads = 
OptimizerUtils.getParallelBinaryReadParallelism();
+               
+               try 
+               {
+                       //create read tasks for all files
+                       ExecutorService pool = 
Executors.newFixedThreadPool(numThreads);
+                       ArrayList<ReadFileTask> tasks = new 
ArrayList<ReadFileTask>();
+                       for( Path lpath : getSequenceFilePaths(fs, path) ){
+                               ReadFileTask t = new ReadFileTask(lpath, job, 
fs, dest);
+                               tasks.add(t);
+                       }
+
+                       //wait until all tasks have been executed
+                       List<Future<Object>> rt = pool.invokeAll(tasks);        
+                       pool.shutdown();
+                       
+                       //check for exceptions and aggregate nnz
+                       for( Future<Object> task : rt )
+                               task.get();
+               } 
+               catch (Exception e) {
+                       throw new IOException("Failed parallel read of binary 
block input.", e);
+               }
+       }
+       
+       /**
+        * 
+        */
+       private class ReadFileTask implements Callable<Object> 
+       {
+               private Path _path = null;
+               private JobConf _job = null;
+               private FileSystem _fs = null;
+               private FrameBlock _dest = null;
+               
+               public ReadFileTask(Path path, JobConf job, FileSystem fs, 
FrameBlock dest) {
+                       _path = path;
+                       _fs = fs;
+                       _job = job;
+                       _dest = dest;
+               }
+
+               @Override
+               public Object call() throws Exception 
+               {
+                       readBinaryBlockFrameFromSequenceFile(_path, _job, _fs, 
_dest);
+                       return null;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2163bfb4/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java
index 35ec5f7..51606df 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java
@@ -19,6 +19,8 @@
 
 package org.apache.sysml.runtime.io;
 
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.conf.CompilerConfig.ConfigType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FileFormatProperties;
@@ -39,25 +41,10 @@ public class FrameReaderFactory
        public static FrameReader createFrameReader( InputInfo iinfo ) 
                throws DMLRuntimeException
        {
-               FrameReader reader = null;
+               FileFormatProperties props = (iinfo==InputInfo.CSVInputInfo) ?
+                       new CSVFileFormatProperties() : null;           
                
-               if( iinfo == InputInfo.TextCellInputInfo )
-               {
-                       reader = new FrameReaderTextCell();     
-               }
-               else if( iinfo == InputInfo.CSVInputInfo )
-               {
-                       reader = new FrameReaderTextCSV(new 
CSVFileFormatProperties());
-               }
-               else if( iinfo == InputInfo.BinaryBlockInputInfo ) {
-                       reader = new FrameReaderBinaryBlock();
-               }
-               else {
-                       throw new DMLRuntimeException("Failed to create frame 
reader for unknown input info: "
-                                                  + 
InputInfo.inputInfoToString(iinfo));
-               }
-               
-               return reader;
+               return createFrameReader(iinfo, props);
        }
        
        /**
@@ -66,31 +53,18 @@ public class FrameReaderFactory
         * @return
         * @throws DMLRuntimeException
         */
-       public static FrameReader createFrameReader( ReadProperties props ) 
+       public static FrameReader createFrameReader( ReadProperties rprops ) 
                throws DMLRuntimeException
        {
                //check valid read properties
-               if( props == null )
+               if( rprops == null )
                        throw new DMLRuntimeException("Failed to create frame 
reader with empty properties.");
                
-               FrameReader reader = null;
-               InputInfo iinfo = props.inputInfo;
-
-               if( iinfo == InputInfo.TextCellInputInfo ) {
-                       reader = new FrameReaderTextCell();
-               }
-               else if( iinfo == InputInfo.CSVInputInfo ) {
-                       reader = new FrameReaderTextCSV( 
props.formatProperties!=null ? (CSVFileFormatProperties)props.formatProperties 
: new CSVFileFormatProperties());
-               }
-               else if( iinfo == InputInfo.BinaryBlockInputInfo ) {
-                       reader = new FrameReaderBinaryBlock();
-               }
-               else {
-                       throw new DMLRuntimeException("Failed to create frame 
reader for unknown input info: "
-                                                  + 
InputInfo.inputInfoToString(iinfo));
-               }
-               
-               return reader;
+               InputInfo iinfo = rprops.inputInfo;
+               FileFormatProperties props = (iinfo==InputInfo.CSVInputInfo) ? 
((rprops.formatProperties!=null) ? 
+                       (CSVFileFormatProperties)rprops.formatProperties : new 
CSVFileFormatProperties()) : null;               
+                       
+               return createFrameReader(iinfo, props);
        }
 
        
@@ -114,7 +88,10 @@ public class FrameReaderFactory
                        reader = new FrameReaderTextCSV( 
(CSVFileFormatProperties)props);
                }
                else if( iinfo == InputInfo.BinaryBlockInputInfo ) {
-                       reader = new FrameReaderBinaryBlock();
+                       if( 
ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_BINARYFORMATS)
 )
+                               reader = new FrameReaderBinaryBlockParallel();
+                       else
+                               reader = new FrameReaderBinaryBlock();
                }
                else {
                        throw new DMLRuntimeException("Failed to create frame 
reader for unknown input info: "

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2163bfb4/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index b851704..aef736b 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -198,7 +198,7 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
         * @param size
         * @return
         */
-       private static List<String> createColNames(int size) {
+       public static List<String> createColNames(int size) {
                ArrayList<String> ret = new ArrayList<String>(size);
                for( int i=1; i<=size; i++ )
                        ret.add(createColName(i));
@@ -210,7 +210,7 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
         * @param i
         * @return
         */
-       private static String createColName(int i) {
+       public static String createColName(int i) {
                return "C" + i;
        }
        

Reply via email to