[SYSTEMML-630] New parallel binary block frame reader, cleanup Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/2163bfb4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/2163bfb4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/2163bfb4
Branch: refs/heads/master Commit: 2163bfb44ff26915adf18dfa4def3af8dcc6822e Parents: fdf5518 Author: Matthias Boehm <[email protected]> Authored: Tue Jun 7 17:59:01 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Tue Jun 7 17:59:01 2016 -0700 ---------------------------------------------------------------------- .../apache/sysml/runtime/io/FrameReader.java | 25 +++++ .../runtime/io/FrameReaderBinaryBlock.java | 100 +++++++++-------- .../io/FrameReaderBinaryBlockParallel.java | 108 +++++++++++++++++++ .../sysml/runtime/io/FrameReaderFactory.java | 55 +++------- .../sysml/runtime/matrix/data/FrameBlock.java | 4 +- 5 files changed, 207 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2163bfb4/src/main/java/org/apache/sysml/runtime/io/FrameReader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java index a39d129..6a4b469 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java @@ -22,6 +22,7 @@ package org.apache.sysml.runtime.io; import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -165,6 +166,30 @@ public abstract class FrameReader /** * + * @param schema + * @param ncol + * @return + */ + protected static List<ValueType> createOutputSchema(List<ValueType> schema, long ncol) { + if( schema.size()==1 && ncol > 1 ) + return Collections.nCopies((int)ncol, schema.get(0)); + return schema; + } + + /** + * + * @param names + * @param ncol + * @return + */ + protected static List<String> createOutputNames(List<String> names, long ncol) { + if( names.size()==1 && ncol > 1 ) + return FrameBlock.createColNames((int)ncol); + return names; + } + + /** + * * @param fs * @param path * @throws IOException http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2163bfb4/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java index 534decf..f2a4a8a 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java @@ -33,7 +33,10 @@ import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.data.FrameBlock; - +/** + * Single-threaded frame binary block reader. + * + */ public class FrameReaderBinaryBlock extends FrameReader { /** @@ -48,12 +51,13 @@ public class FrameReaderBinaryBlock extends FrameReader * @throws IOException */ @Override - public FrameBlock readFrameFromHDFS(String fname, List<ValueType> schema, List<String> names, - long rlen, long clen) - throws IOException, DMLRuntimeException + public final FrameBlock readFrameFromHDFS(String fname, List<ValueType> schema, List<String> names, long rlen, long clen) + throws IOException, DMLRuntimeException { //allocate output frame block - FrameBlock ret = createOutputFrameBlock(schema, names, rlen); + List<ValueType> lschema = createOutputSchema(schema, clen); + List<String> lnames = createOutputNames(names, clen); + FrameBlock ret = createOutputFrameBlock(lschema, lnames, rlen); //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); @@ -63,21 +67,13 @@ public class FrameReaderBinaryBlock extends FrameReader //check existence and non-empty file checkValidInputFile(fs, path); - //core read + //core read (sequential/parallel) readBinaryBlockFrameFromHDFS(path, job, fs, ret, rlen, clen); return ret; } /** - * Note: For efficiency, we directly use SequenceFile.Reader instead of SequenceFileInputFormat- - * InputSplits-RecordReader (SequenceFileRecordReader). First, this has no drawbacks since the - * SequenceFileRecordReader internally uses SequenceFile.Reader as well. Second, it is - * advantageous if the actual sequence files are larger than the file splits created by - * informat.getSplits (which is usually aligned to the HDFS block size) because then there is - * overhead for finding the actual split between our 1k-1k blocks. This case happens - * if the read frame was create by CP or when jobs directly write to large output files - * (e.g., parfor matrix partitioning). * * @param path * @param job @@ -89,46 +85,62 @@ public class FrameReaderBinaryBlock extends FrameReader * @throws IOException * @throws DMLRuntimeException */ - @SuppressWarnings("deprecation") - private static void readBinaryBlockFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, long rlen, long clen ) + protected void readBinaryBlockFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, long rlen, long clen ) + throws IOException, DMLRuntimeException + { + //sequential read from sequence files + for( Path lpath : getSequenceFilePaths(fs, path) ) //1..N files + readBinaryBlockFrameFromSequenceFile(lpath, job, fs, dest); + } + + /** + * + * @param path + * @param job + * @param fs + * @param dest + * @throws IOException + * @throws DMLRuntimeException + */ + @SuppressWarnings({ "deprecation", "resource" }) + protected final void readBinaryBlockFrameFromSequenceFile( Path path, JobConf job, FileSystem fs, FrameBlock dest ) throws IOException, DMLRuntimeException { + int rlen = dest.getNumRows(); + int clen = dest.getNumColumns(); + + //directly read from sequence files (individual partfiles) + SequenceFile.Reader reader = new SequenceFile.Reader(fs,path,job); LongWritable key = new LongWritable(-1L); FrameBlock value = new FrameBlock(); - for( Path lpath : getSequenceFilePaths(fs, path) ) //1..N files + try { - //directly read from sequence files (individual partfiles) - SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job); - - try - { - //note: next(key, value) does not yet exploit the given serialization classes, record reader does but is generally slower. - while( reader.next(key, value) ) { - int row_offset = (int)(key.get()-1); - - int rows = value.getNumRows(); - int cols = value.getNumColumns(); + //note: next(key, value) does not yet exploit the given serialization classes, record reader does but is generally slower. + while( reader.next(key, value) ) { + int row_offset = (int)(key.get()-1); + + int rows = value.getNumRows(); + int cols = value.getNumColumns(); - if(rows == 0 || cols == 0) //Empty block, ignore it. - continue; - - //bound check per block - if( row_offset + rows < 0 || row_offset + rows > rlen ) { - throw new IOException("Frame block ["+(row_offset+1)+":"+(row_offset+rows)+","+":"+"] " + - "out of overall frame range [1:"+rlen+",1:"+clen+"]."); - } - - dest.copy( row_offset, row_offset+rows-1, - 0, cols-1, value); + if(rows == 0 || cols == 0) //Empty block, ignore it. + continue; + + //bound check per block + if( row_offset + rows < 0 || row_offset + rows > rlen ) { + throw new IOException("Frame block ["+(row_offset+1)+":"+(row_offset+rows)+","+":"+"] " + + "out of overall frame range [1:"+rlen+",1:"+clen+"]."); } - } - finally { - IOUtilFunctions.closeSilently(reader); + + dest.copy( row_offset, row_offset+rows-1, + 0, cols-1, value); } } - } - + finally { + IOUtilFunctions.closeSilently(reader); + } + } + /** * Specific functionality of FrameReaderBinaryBlock, mostly used for testing. * http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2163bfb4/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java new file mode 100644 index 0000000..d684af0 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.io; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.matrix.data.FrameBlock; + + +/** + * Multi-threaded frame binary block reader. + * + */ +public class FrameReaderBinaryBlockParallel extends FrameReaderBinaryBlock +{ + /** + * + * @param path + * @param job + * @param fs + * @param dest + * @param rlen + * @param clen + * + * @throws IOException + * @throws DMLRuntimeException + */ + protected void readBinaryBlockFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, long rlen, long clen ) + throws IOException, DMLRuntimeException + { + int numThreads = OptimizerUtils.getParallelBinaryReadParallelism(); + + try + { + //create read tasks for all files + ExecutorService pool = Executors.newFixedThreadPool(numThreads); + ArrayList<ReadFileTask> tasks = new ArrayList<ReadFileTask>(); + for( Path lpath : getSequenceFilePaths(fs, path) ){ + ReadFileTask t = new ReadFileTask(lpath, job, fs, dest); + tasks.add(t); + } + + //wait until all tasks have been executed + List<Future<Object>> rt = pool.invokeAll(tasks); + pool.shutdown(); + + //check for exceptions and aggregate nnz + for( Future<Object> task : rt ) + task.get(); + } + catch (Exception e) { + throw new IOException("Failed parallel read of binary block input.", e); + } + } + + /** + * + */ + private class ReadFileTask implements Callable<Object> + { + private Path _path = null; + private JobConf _job = null; + private FileSystem _fs = null; + private FrameBlock _dest = null; + + public ReadFileTask(Path path, JobConf job, FileSystem fs, FrameBlock dest) { + _path = path; + _fs = fs; + _job = job; + _dest = dest; + } + + @Override + public Object call() throws Exception + { + readBinaryBlockFrameFromSequenceFile(_path, _job, _fs, _dest); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2163bfb4/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java index 35ec5f7..51606df 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java @@ -19,6 +19,8 @@ package org.apache.sysml.runtime.io; +import org.apache.sysml.conf.ConfigurationManager; +import org.apache.sysml.conf.CompilerConfig.ConfigType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FileFormatProperties; @@ -39,25 +41,10 @@ public class FrameReaderFactory public static FrameReader createFrameReader( InputInfo iinfo ) throws DMLRuntimeException { - FrameReader reader = null; + FileFormatProperties props = (iinfo==InputInfo.CSVInputInfo) ? + new CSVFileFormatProperties() : null; - if( iinfo == InputInfo.TextCellInputInfo ) - { - reader = new FrameReaderTextCell(); - } - else if( iinfo == InputInfo.CSVInputInfo ) - { - reader = new FrameReaderTextCSV(new CSVFileFormatProperties()); - } - else if( iinfo == InputInfo.BinaryBlockInputInfo ) { - reader = new FrameReaderBinaryBlock(); - } - else { - throw new DMLRuntimeException("Failed to create frame reader for unknown input info: " - + InputInfo.inputInfoToString(iinfo)); - } - - return reader; + return createFrameReader(iinfo, props); } /** @@ -66,31 +53,18 @@ public class FrameReaderFactory * @return * @throws DMLRuntimeException */ - public static FrameReader createFrameReader( ReadProperties props ) + public static FrameReader createFrameReader( ReadProperties rprops ) throws DMLRuntimeException { //check valid read properties - if( props == null ) + if( rprops == null ) throw new DMLRuntimeException("Failed to create frame reader with empty properties."); - FrameReader reader = null; - InputInfo iinfo = props.inputInfo; - - if( iinfo == InputInfo.TextCellInputInfo ) { - reader = new FrameReaderTextCell(); - } - else if( iinfo == InputInfo.CSVInputInfo ) { - reader = new FrameReaderTextCSV( props.formatProperties!=null ? (CSVFileFormatProperties)props.formatProperties : new CSVFileFormatProperties()); - } - else if( iinfo == InputInfo.BinaryBlockInputInfo ) { - reader = new FrameReaderBinaryBlock(); - } - else { - throw new DMLRuntimeException("Failed to create frame reader for unknown input info: " - + InputInfo.inputInfoToString(iinfo)); - } - - return reader; + InputInfo iinfo = rprops.inputInfo; + FileFormatProperties props = (iinfo==InputInfo.CSVInputInfo) ? ((rprops.formatProperties!=null) ? + (CSVFileFormatProperties)rprops.formatProperties : new CSVFileFormatProperties()) : null; + + return createFrameReader(iinfo, props); } @@ -114,7 +88,10 @@ public class FrameReaderFactory reader = new FrameReaderTextCSV( (CSVFileFormatProperties)props); } else if( iinfo == InputInfo.BinaryBlockInputInfo ) { - reader = new FrameReaderBinaryBlock(); + if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_BINARYFORMATS) ) + reader = new FrameReaderBinaryBlockParallel(); + else + reader = new FrameReaderBinaryBlock(); } else { throw new DMLRuntimeException("Failed to create frame reader for unknown input info: " http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2163bfb4/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java index b851704..aef736b 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java @@ -198,7 +198,7 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable * @param size * @return */ - private static List<String> createColNames(int size) { + public static List<String> createColNames(int size) { ArrayList<String> ret = new ArrayList<String>(size); for( int i=1; i<=size; i++ ) ret.add(createColName(i)); @@ -210,7 +210,7 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable * @param i * @return */ - private static String createColName(int i) { + public static String createColName(int i) { return "C" + i; }
