[SYSTEMML-630] New parallel csv/textcell frame readers, cleanup/tests This patch introduces parallel frame readers for csv and textcell. On a 1Mx1k (8GB binary,~25-30GB in csv/text) scenario, multi-threaded read led to the following performance improvements:
* Textcell: 930s (single-threaded) -> 83s (multi-threaded) * CSV: 538s (single-threaded) -> 51s (multi-threaded) Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/dea42de1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/dea42de1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/dea42de1 Branch: refs/heads/master Commit: dea42de1ff7e8ada41ad50e4e941f080f82d2346 Parents: 288438b Author: Matthias Boehm <[email protected]> Authored: Wed Jun 8 00:08:27 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Wed Jun 8 01:24:02 2016 -0700 ---------------------------------------------------------------------- .../io/FrameReaderBinaryBlockParallel.java | 11 +- .../sysml/runtime/io/FrameReaderFactory.java | 10 +- .../sysml/runtime/io/FrameReaderTextCSV.java | 217 ++++++++++------- .../runtime/io/FrameReaderTextCSVParallel.java | 235 +++++++++++++++++++ .../sysml/runtime/io/FrameReaderTextCell.java | 132 ++++++----- .../runtime/io/FrameReaderTextCellParallel.java | 114 +++++++++ .../sysml/runtime/io/IOUtilFunctions.java | 28 +++ .../sysml/runtime/io/ReaderTextCSVParallel.java | 19 +- .../functions/frame/FrameReadWriteTest.java | 89 +++++-- 9 files changed, 662 insertions(+), 193 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/dea42de1/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java index d684af0..3571c37 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java @@ -63,16 +63,14 @@ public class FrameReaderBinaryBlockParallel extends FrameReaderBinaryBlock //create read tasks for all files ExecutorService pool = Executors.newFixedThreadPool(numThreads); ArrayList<ReadFileTask> tasks = new ArrayList<ReadFileTask>(); - for( Path lpath : getSequenceFilePaths(fs, path) ){ - ReadFileTask t = new ReadFileTask(lpath, job, fs, dest); - tasks.add(t); - } + for( Path lpath : getSequenceFilePaths(fs, path) ) + tasks.add(new ReadFileTask(lpath, job, fs, dest)); //wait until all tasks have been executed List<Future<Object>> rt = pool.invokeAll(tasks); pool.shutdown(); - //check for exceptions and aggregate nnz + //check for exceptions for( Future<Object> task : rt ) task.get(); } @@ -99,8 +97,7 @@ public class FrameReaderBinaryBlockParallel extends FrameReaderBinaryBlock } @Override - public Object call() throws Exception - { + public Object call() throws Exception { readBinaryBlockFrameFromSequenceFile(_path, _job, _fs, _dest); return null; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/dea42de1/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java index 51606df..b768450 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java @@ -80,12 +80,18 @@ public class FrameReaderFactory FrameReader reader = null; if( iinfo == InputInfo.TextCellInputInfo ) { - reader = new FrameReaderTextCell(); + if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_BINARYFORMATS) ) + reader = new FrameReaderTextCellParallel(); + else + reader = new FrameReaderTextCell(); } else if( iinfo == InputInfo.CSVInputInfo ) { if( props!=null && !(props instanceof CSVFileFormatProperties) ) throw new DMLRuntimeException("Wrong type of file format properties for CSV writer."); - reader = new FrameReaderTextCSV( (CSVFileFormatProperties)props); + if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_BINARYFORMATS) ) + reader = new FrameReaderTextCSVParallel( (CSVFileFormatProperties)props ); + else + reader = new FrameReaderTextCSV( (CSVFileFormatProperties)props ); } else if( iinfo == InputInfo.BinaryBlockInputInfo ) { if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_BINARYFORMATS) ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/dea42de1/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java index 2086ea3..d282aef 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java @@ -19,32 +19,37 @@ package org.apache.sysml.runtime.io; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import org.apache.hadoop.fs.FileStatus; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextInputFormat; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.matrix.CSVReblockMR; import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.util.UtilFunctions; +/** + * Single-threaded frame text csv reader. + * + */ public class FrameReaderTextCSV extends FrameReader { - - private CSVFileFormatProperties _props = null; + protected CSVFileFormatProperties _props = null; - public FrameReaderTextCSV(CSVFileFormatProperties props) - { + public FrameReaderTextCSV(CSVFileFormatProperties props) { _props = props; } @@ -61,15 +66,10 @@ public class FrameReaderTextCSV extends FrameReader * @throws IOException */ @Override - public FrameBlock readFrameFromHDFS(String fname, List<ValueType> schema, List<String> names, + public final FrameBlock readFrameFromHDFS(String fname, List<ValueType> schema, List<String> names, long rlen, long clen) throws IOException, DMLRuntimeException { - //allocate output frame block - FrameBlock ret = null; - if( rlen>0 && clen>0 ) //otherwise CSV reblock based on file size for frame w/ unknown dimensions - ret = createOutputFrameBlock(schema, names, rlen); - //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); FileSystem fs = FileSystem.get(job); @@ -77,13 +77,52 @@ public class FrameReaderTextCSV extends FrameReader //check existence and non-empty file checkValidInputFile(fs, path); + + //compute size if necessary + if( rlen <= 0 || clen <= 0 ) { + Pair<Integer,Integer> size = computeCSVSize(path, job, fs); + rlen = size.getKey(); + clen = size.getValue(); + } + + //allocate output frame block + List<ValueType> lschema = createOutputSchema(schema, clen); + List<String> lnames = createOutputNames(names, clen); + FrameBlock ret = createOutputFrameBlock(lschema, lnames, rlen); - //core read - ret = readCSVFrameFromHDFS(path, job, fs, ret, schema, names, rlen, clen, - _props.hasHeader(), _props.getDelim(), _props.isFill() ); + //core read (sequential/parallel) + readCSVFrameFromHDFS(path, job, fs, ret, lschema, lnames, rlen, clen); return ret; } + + /** + * + * @param path + * @param job + * @param fs + * @param dest + * @param schema + * @param names + * @param rlen + * @param clen + * @return + * @throws IOException + */ + protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs, + FrameBlock dest, List<ValueType> schema, List<String> names, long rlen, long clen) + throws IOException + { + FileInputFormat.addInputPath(job, path); + TextInputFormat informat = new TextInputFormat(); + informat.configure(job); + InputSplit[] splits = informat.getSplits(job, 1); + splits = IOUtilFunctions.sortInputSplits(splits); + for( int i=0; i<splits.length; i++ ) + readCSVFrameFrameFromInputSplit(splits[i], informat, job, dest, schema, names, rlen, clen, 0, i==0); + } + + /** * @@ -99,72 +138,60 @@ public class FrameReaderTextCSV extends FrameReader * @return * @throws IOException */ - @SuppressWarnings("unchecked") - private FrameBlock readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, - List<ValueType> schema, List<String> names, long rlen, long clen, boolean hasHeader, String delim, boolean fill) + protected final void readCSVFrameFrameFromInputSplit( InputSplit split, TextInputFormat informat, JobConf job, + FrameBlock dest, List<ValueType> schema, List<String> names, long rlen, long clen, int rl, boolean first) throws IOException { - ArrayList<Path> files=new ArrayList<Path>(); - if(fs.isDirectory(path)) { - for(FileStatus stat: fs.listStatus(path, CSVReblockMR.hiddenFileFilter)) - files.add(stat.getPath()); - Collections.sort(files); - } - else - files.add(path); + boolean hasHeader = _props.hasHeader(); + boolean isFill = _props.isFill(); + double dfillValue = _props.getFillValue(); + String sfillValue = String.valueOf(_props.getFillValue()); + String delim = _props.getDelim(); - if ( dest == null ) { - dest = computeCSVSize(files, fs, schema, names, hasHeader, delim); - clen = dest.getNumColumns(); - } - - ///////////////////////////////////////// - String value = null; - int row = 0; + //create record reader + RecordReader<LongWritable, Text> reader = informat.getRecordReader(split, job, Reporter.NULL); + LongWritable key = new LongWritable(); + Text value = new Text(); + int row = rl; int col = -1; - for(int fileNo=0; fileNo<files.size(); fileNo++) - { - BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo)))); - if(fileNo==0 && hasHeader ) - br.readLine(); //ignore header + //handle header if existing + if(first && hasHeader ) + reader.next(key, value); //ignore header - // Read the data - boolean emptyValuesFound = false; - try + // Read the data + boolean emptyValuesFound = false; + try + { + while( reader.next(key, value) ) //foreach line { - while( (value=br.readLine())!=null ) //foreach line + String cellStr = value.toString().trim(); + emptyValuesFound = false; col = 0; + String[] parts = IOUtilFunctions.split(cellStr, delim); + + for( String part : parts ) //foreach cell { - String cellStr = value.toString().trim(); - emptyValuesFound = false; - String[] parts = IOUtilFunctions.split(cellStr, delim); - col = 0; - - for( String part : parts ) //foreach cell - { - part = part.trim(); - if ( part.isEmpty() ) { - //TODO: Do we need to handle empty cell condition? - emptyValuesFound = true; - } - else { - dest.set(row, col, UtilFunctions.stringToObject(schema.get(col), part)); - } - col++; + part = part.trim(); + if ( part.isEmpty() ) { + if( isFill && dfillValue!=0 ) + dest.set(row, col, UtilFunctions.stringToObject(schema.get(col), sfillValue)); + emptyValuesFound = true; } - - //sanity checks for empty values and number of columns - IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, fill, emptyValuesFound); - IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(path.toString(), cellStr, parts, clen); - row++; + else { + dest.set(row, col, UtilFunctions.stringToObject(schema.get(col), part)); + } + col++; } - } - finally { - IOUtilFunctions.closeSilently(br); + + //sanity checks for empty values and number of columns + IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, isFill, emptyValuesFound); + IOUtilFunctions.checkAndRaiseErrorCSVNumColumns("", cellStr, parts, clen); + row++; } } - - return dest; + finally { + IOUtilFunctions.closeSilently(reader); + } } /** @@ -178,35 +205,45 @@ public class FrameReaderTextCSV extends FrameReader * @return * @throws IOException */ - private FrameBlock computeCSVSize ( List<Path> files, FileSystem fs, List<ValueType> schema, List<String> names, boolean hasHeader, String delim) + protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, FileSystem fs) throws IOException { - int nrow = 0; - for(int fileNo=0; fileNo<files.size(); fileNo++) + FileInputFormat.addInputPath(job, path); + TextInputFormat informat = new TextInputFormat(); + informat.configure(job); + InputSplit[] splits = informat.getSplits(job, 1); + splits = IOUtilFunctions.sortInputSplits(splits); + + boolean first = true; + int ncol = -1; + int nrow = -1; + + for( InputSplit split : splits ) { - BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo)))); + RecordReader<LongWritable, Text> reader = informat.getRecordReader(split, job, Reporter.NULL); + LongWritable key = new LongWritable(); + Text value = new Text(); + try { - // Read the header line, if there is one. - if(fileNo==0) - { - if ( hasHeader ) - br.readLine(); //ignore header + //read head and first line to determine num columns + if( first ) { + if ( _props.hasHeader() ) + reader.next(key, value); //ignore header + reader.next(key, value); + ncol = StringUtils.countMatches(value.toString(), _props.getDelim()) + 1; + nrow = 1; first = false; } - while ( br.readLine() != null ) { + //count remaining number of rows + while ( reader.next(key, value) ) nrow++; - } } finally { - IOUtilFunctions.closeSilently(br); + IOUtilFunctions.closeSilently(reader); } } - //create new frame block - FrameBlock frameBlock = new FrameBlock(schema, names); - frameBlock.ensureAllocatedColumns(nrow); - return frameBlock; + return new Pair<Integer,Integer>(nrow, ncol); } - } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/dea42de1/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java new file mode 100644 index 0000000..b998fce --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.io; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.parser.Expression.ValueType; +import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.Pair; + +/** + * Multi-threaded frame text csv reader. + * + */ +public class FrameReaderTextCSVParallel extends FrameReaderTextCSV +{ + public FrameReaderTextCSVParallel(CSVFileFormatProperties props) { + super(props); + } + + /** + * + * @param path + * @param job + * @param fs + * @param dest + * @param schema + * @param names + * @param rlen + * @param clen + * @return + * @throws IOException + */ + @Override + protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs, + FrameBlock dest, List<ValueType> schema, List<String> names, long rlen, long clen) + throws IOException + { + int numThreads = OptimizerUtils.getParallelTextReadParallelism(); + + FileInputFormat.addInputPath(job, path); + TextInputFormat informat = new TextInputFormat(); + informat.configure(job); + InputSplit[] splits = informat.getSplits(job, numThreads); + splits = IOUtilFunctions.sortInputSplits(splits); + + try + { + ExecutorService pool = Executors.newFixedThreadPool(numThreads); + + //compute num rows per split + ArrayList<CountRowsTask> tasks = new ArrayList<CountRowsTask>(); + for( int i=0; i<splits.length; i++ ) + tasks.add(new CountRowsTask(splits[i], informat, job, _props.hasHeader(), i==0)); + List<Future<Long>> cret = pool.invokeAll(tasks); + + //compute row offset per split via cumsum on row counts + long offset = 0; + List<Long> offsets = new ArrayList<Long>(); + for( Future<Long> count : cret ) { + offsets.add(offset); + offset += count.get(); + } + + //read individial splits + ArrayList<ReadRowsTask> tasks2 = new ArrayList<ReadRowsTask>(); + for( int i=0; i<splits.length; i++ ) + tasks2.add( new ReadRowsTask(splits[i], informat, job, dest, offsets.get(i).intValue(), i==0)); + List<Future<Object>> rret = pool.invokeAll(tasks2); + pool.shutdown(); + + //error handling + for( Future<Object> read : rret ) + read.get(); + } + catch (Exception e) { + throw new IOException("Failed parallel read of text csv input.", e); + } + } + + @Override + protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, FileSystem fs) + throws IOException + { + int numThreads = OptimizerUtils.getParallelTextReadParallelism(); + + FileInputFormat.addInputPath(job, path); + TextInputFormat informat = new TextInputFormat(); + informat.configure(job); + InputSplit[] splits = informat.getSplits(job, numThreads); + + //compute number of columns + RecordReader<LongWritable, Text> reader = informat.getRecordReader(splits[0], job, Reporter.NULL); + LongWritable key = new LongWritable(); + Text value = new Text(); + reader.next(key, value); + int ncol = StringUtils.countMatches(value.toString(), _props.getDelim()) + 1; + reader.close(); + + //compute number of rows + ExecutorService pool = Executors.newFixedThreadPool(numThreads); + + //compute num rows per split + int nrow = 0; + try { + ArrayList<CountRowsTask> tasks = new ArrayList<CountRowsTask>(); + for( int i=0; i<splits.length; i++ ) + tasks.add(new CountRowsTask(splits[i], informat, job, _props.hasHeader(), i==0)); + List<Future<Long>> cret = pool.invokeAll(tasks); + for( Future<Long> count : cret ) + nrow += count.get().intValue(); + } + catch (Exception e) { + throw new IOException("Failed parallel read of text csv input.", e); + } + + return new Pair<Integer,Integer>(nrow, ncol); + } + + /** + * + * + */ + private static class CountRowsTask implements Callable<Long> + { + private InputSplit _split = null; + private TextInputFormat _informat = null; + private JobConf _job = null; + private boolean _hasHeader = false; + private boolean _firstSplit = false; + + public CountRowsTask(InputSplit split, TextInputFormat informat, JobConf job, boolean hasHeader, boolean first) { + _split = split; + _informat = informat; + _job = job; + _hasHeader = hasHeader; + _firstSplit = first; + } + + @Override + public Long call() + throws Exception + { + + RecordReader<LongWritable, Text> reader = _informat.getRecordReader(_split, _job, Reporter.NULL); + LongWritable key = new LongWritable(); + Text value = new Text(); + long nrows = 0; + + // count rows from the first non-header row + try { + if ( _firstSplit && _hasHeader ) + reader.next(key, value); + while (reader.next(key, value)) + nrows++; + } + finally { + IOUtilFunctions.closeSilently(reader); + } + + return nrows; + } + } + + /** + * + * + */ + private class ReadRowsTask implements Callable<Object> + { + private InputSplit _split = null; + private TextInputFormat _informat = null; + private JobConf _job = null; + private FrameBlock _dest = null; + private int _offset = -1; + private boolean _isFirstSplit = false; + + + public ReadRowsTask(InputSplit split, TextInputFormat informat, JobConf job, + FrameBlock dest, int offset, boolean first) + { + _split = split; + _informat = informat; + _job = job; + _dest = dest; + _offset = offset; + _isFirstSplit = first; + } + + @Override + public Object call() + throws Exception + { + readCSVFrameFrameFromInputSplit(_split, _informat, _job, _dest, _dest.getSchema(), + _dest.getColumnNames(), _dest.getNumRows(), _dest.getNumColumns(), _offset, _isFirstSplit); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/dea42de1/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java index 2f42a64..8345b47 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java @@ -42,9 +42,12 @@ import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.util.FastStringTokenizer; import org.apache.sysml.runtime.util.UtilFunctions; +/** + * Single-threaded frame textcell reader. + * + */ public class FrameReaderTextCell extends FrameReader { - /** * * @param fname @@ -57,12 +60,13 @@ public class FrameReaderTextCell extends FrameReader * @throws IOException */ @Override - public FrameBlock readFrameFromHDFS(String fname, List<ValueType> schema, List<String> names, - long rlen, long clen) - throws IOException, DMLRuntimeException + public final FrameBlock readFrameFromHDFS(String fname, List<ValueType> schema, List<String> names, long rlen, long clen) + throws IOException, DMLRuntimeException { //allocate output frame block - FrameBlock ret = createOutputFrameBlock(schema, names, rlen); + List<ValueType> lschema = createOutputSchema(schema, clen); + List<String> lnames = createOutputNames(names, clen); + FrameBlock ret = createOutputFrameBlock(lschema, lnames, rlen); //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); @@ -72,11 +76,8 @@ public class FrameReaderTextCell extends FrameReader //check existence and non-empty file checkValidInputFile(fs, path); - //core read - if( fs.isDirectory(path) ) - readTextCellFrameFromHDFS(path, job, ret, schema, names, rlen, clen); - else - readRawTextCellFrameFromHDFS(path, job, fs, ret, schema, names, rlen, clen); + //core read (sequential/parallel) + readTextCellFrameFromHDFS(path, job, fs, ret, lschema, lnames, rlen, clen); return ret; } @@ -90,7 +91,7 @@ public class FrameReaderTextCell extends FrameReader * @throws IOException * @throws DMLRuntimeException */ - public FrameBlock readFrameFromInputStream(InputStream is, long rlen, long clen) + public final FrameBlock readFrameFromInputStream(InputStream is, long rlen, long clen) throws IOException, DMLRuntimeException { return readFrameFromInputStream(is, getDefSchema(clen), getDefColNames(clen), rlen, clen); } @@ -106,83 +107,98 @@ public class FrameReaderTextCell extends FrameReader * @throws DMLRuntimeException * @throws IOException */ - public FrameBlock readFrameFromInputStream(InputStream is, List<ValueType> schema, List<String> names, long rlen, long clen) + public final FrameBlock readFrameFromInputStream(InputStream is, List<ValueType> schema, List<String> names, long rlen, long clen) throws IOException, DMLRuntimeException { //allocate output frame block - FrameBlock ret = createOutputFrameBlock(schema, names, rlen); + List<ValueType> lschema = createOutputSchema(schema, clen); + List<String> lnames = createOutputNames(names, clen); + FrameBlock ret = createOutputFrameBlock(lschema, lnames, rlen); //core read - readRawTextCellFrameFromInputStream(is, ret, schema, names, rlen, clen); + readRawTextCellFrameFromInputStream(is, ret, lschema, lnames, rlen, clen); return ret; } - /** * * @param path * @param job + * @param fs * @param dest * @param schema * @param names * @param rlen * @param clen - * @return * @throws IOException */ - private void readTextCellFrameFromHDFS( Path path, JobConf job, FrameBlock dest, + protected void readTextCellFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, List<ValueType> schema, List<String> names, long rlen, long clen) throws IOException { - FileInputFormat.addInputPath(job, path); - TextInputFormat informat = new TextInputFormat(); - informat.configure(job); - InputSplit[] splits = informat.getSplits(job, 1); + if( fs.isDirectory(path) ) { + FileInputFormat.addInputPath(job, path); + TextInputFormat informat = new TextInputFormat(); + informat.configure(job); + InputSplit[] splits = informat.getSplits(job, 1); + for(InputSplit split: splits) + readTextCellFrameFromInputSplit(split, informat, job, dest); + } + else { + readRawTextCellFrameFromHDFS(path, job, fs, dest, schema, names, rlen, clen); + } + } + + /** + * + * @param split + * @param dest + * @param schema + * @param names + * @param rlen + * @param clen + * @throws IOException + */ + protected final void readTextCellFrameFromInputSplit( InputSplit split, TextInputFormat informat, JobConf job, FrameBlock dest) + throws IOException + { + List<ValueType> schema = dest.getSchema(); + int rlen = dest.getNumRows(); + int clen = dest.getNumColumns(); + + //create record reader + RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL); LongWritable key = new LongWritable(); Text value = new Text(); + FastStringTokenizer st = new FastStringTokenizer(' '); int row = -1; int col = -1; try { - FastStringTokenizer st = new FastStringTokenizer(' '); - - for(InputSplit split: splits) - { - RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL); - - try - { - while( reader.next(key, value) ) - { - st.reset( value.toString() ); //reinit tokenizer - row = st.nextInt()-1; - col = st.nextInt()-1; - dest.set(row, col, UtilFunctions.stringToObject(schema.get(col), st.nextToken())); - } - } - finally - { - if( reader != null ) - reader.close(); - } + while( reader.next(key, value) ) { + st.reset( value.toString() ); //reinit tokenizer + row = st.nextInt()-1; + col = st.nextInt()-1; + dest.set(row, col, UtilFunctions.stringToObject(schema.get(col), st.nextToken())); } } - catch(Exception ex) + catch(Exception ex) { //post-mortem error handling and bounds checking - if( row < 0 || row + 1 > rlen || col < 0 || col + 1 > clen ) - { + if( row < 0 || row + 1 > rlen || col < 0 || col + 1 > clen ) { throw new IOException("Frame cell ["+(row+1)+","+(col+1)+"] " + "out of overall frame range [1:"+rlen+",1:"+clen+"]."); } - else - { + else { throw new IOException( "Unable to read frame in text cell format.", ex ); } } + finally { + IOUtilFunctions.closeSilently(reader); + } } @@ -199,7 +215,7 @@ public class FrameReaderTextCell extends FrameReader * @return * @throws IOException */ - private void readRawTextCellFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, + protected final void readRawTextCellFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, List<ValueType> schema, List<String> names, long rlen, long clen) throws IOException { @@ -221,21 +237,20 @@ public class FrameReaderTextCell extends FrameReader * @return * @throws IOException */ - private void readRawTextCellFrameFromInputStream( InputStream is, FrameBlock dest, List<ValueType> schema, List<String> names, long rlen, long clen) - throws IOException + protected final void readRawTextCellFrameFromInputStream( InputStream is, FrameBlock dest, List<ValueType> schema, List<String> names, long rlen, long clen) + throws IOException { + //create buffered reader BufferedReader br = new BufferedReader(new InputStreamReader( is )); String value = null; + FastStringTokenizer st = new FastStringTokenizer(' '); int row = -1; int col = -1; try { - FastStringTokenizer st = new FastStringTokenizer(' '); - - while( (value=br.readLine())!=null ) - { + while( (value=br.readLine())!=null ) { st.reset( value ); //reinit tokenizer row = st.nextInt()-1; col = st.nextInt()-1; @@ -245,18 +260,15 @@ public class FrameReaderTextCell extends FrameReader catch(Exception ex) { //post-mortem error handling and bounds checking - if( row < 0 || row + 1 > rlen || col < 0 || col + 1 > clen ) - { + if( row < 0 || row + 1 > rlen || col < 0 || col + 1 > clen ) { throw new IOException("Frame cell ["+(row+1)+","+(col+1)+"] " + "out of overall frame range [1:"+rlen+",1:"+clen+"].", ex); } - else - { + else { throw new IOException( "Unable to read frame in raw text cell format.", ex ); } } - finally - { + finally { IOUtilFunctions.closeSilently(br); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/dea42de1/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCellParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCellParallel.java new file mode 100644 index 0000000..ce3993f --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCellParallel.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.io; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.parser.Expression.ValueType; +import org.apache.sysml.runtime.matrix.data.FrameBlock; + +/** + * Multi-threaded frame textcell reader. + * + */ +public class FrameReaderTextCellParallel extends FrameReaderTextCell +{ + /** + * + * @param path + * @param job + * @param fs + * @param dest + * @param schema + * @param names + * @param rlen + * @param clen + * @throws IOException + */ + @Override + protected void readTextCellFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, + List<ValueType> schema, List<String> names, long rlen, long clen) + throws IOException + { + int numThreads = OptimizerUtils.getParallelTextReadParallelism(); + + FileInputFormat.addInputPath(job, path); + TextInputFormat informat = new TextInputFormat(); + informat.configure(job); + + try + { + //create read tasks for all splits + ExecutorService pool = Executors.newFixedThreadPool(numThreads); + InputSplit[] splits = informat.getSplits(job, numThreads); + ArrayList<ReadTask> tasks = new ArrayList<ReadTask>(); + for( InputSplit split : splits ) + tasks.add(new ReadTask(split, informat, job, dest)); + + //wait until all tasks have been executed + List<Future<Object>> rt = pool.invokeAll(tasks); + pool.shutdown(); + + //check for exceptions + for( Future<Object> task : rt ) + task.get(); + } + catch (Exception e) { + throw new IOException("Failed parallel read of text cell input.", e); + } + } + + /** + * + */ + public class ReadTask implements Callable<Object> + { + private InputSplit _split = null; + private TextInputFormat _informat = null; + private JobConf _job = null; + private FrameBlock _dest = null; + + public ReadTask( InputSplit split, TextInputFormat informat, JobConf job, FrameBlock dest ) { + _split = split; + _informat = informat; + _job = job; + _dest = dest; + } + + @Override + public Object call() throws Exception { + readTextCellFrameFromInputSplit(_split, _informat, _job, _dest); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/dea42de1/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java index 69efec0..93a8818 100644 --- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java @@ -24,10 +24,15 @@ import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; +import java.util.Comparator; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.RecordReader; import org.apache.sysml.runtime.util.LocalFileUtils; import org.apache.sysml.runtime.util.UtilFunctions; @@ -156,4 +161,27 @@ public class IOUtilFunctions input.close(); return bos.toString("UTF-8"); } + + /** + * + * @param splits + * @return + */ + public static InputSplit[] sortInputSplits(InputSplit[] splits) { + if (splits[0] instanceof FileSplit) { + // The splits do not always arrive in order by file name. + // Sort the splits lexicographically by path so that the header will + // be in the first split. + // Note that we're assuming that the splits come in order by offset + Arrays.sort(splits, new Comparator<InputSplit>() { + @Override + public int compare(InputSplit o1, InputSplit o2) { + Path p1 = ((FileSplit) o1).getPath(); + Path p2 = ((FileSplit) o2).getPath(); + return p1.toString().compareTo(p2.toString()); + } + }); + } + return splits; + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/dea42de1/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java index 64c055c..8041ba7 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java @@ -21,8 +21,6 @@ package org.apache.sysml.runtime.io; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -33,7 +31,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; @@ -84,21 +81,7 @@ public class ReaderTextCSVParallel extends MatrixReader informat.configure(job); InputSplit[] splits = informat.getSplits(job, _numThreads); - - if (splits[0] instanceof FileSplit) { - // The splits do not always arrive in order by file name. - // Sort the splits lexicographically by path so that the header will - // be in the first split. - // Note that we're assuming that the splits come in order by offset - Arrays.sort(splits, new Comparator<InputSplit>() { - @Override - public int compare(InputSplit o1, InputSplit o2) { - Path p1 = ((FileSplit) o1).getPath(); - Path p2 = ((FileSplit) o2).getPath(); - return p1.toString().compareTo(p2.toString()); - } - }); - } + splits = IOUtilFunctions.sortInputSplits(splits); // check existence and non-empty file checkValidInputFile(fs, path); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/dea42de1/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java index b07d09e..7d45ebc 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java @@ -62,35 +62,94 @@ public class FrameReadWriteTest extends AutomatedTestBase } @Test - public void testFrameStringsStrings() { - runFrameReadWriteTest(schemaStrings, schemaStrings, false); + public void testFrameStringsStringsBinary() { + runFrameReadWriteTest(OutputInfo.BinaryBlockOutputInfo, schemaStrings, schemaStrings, false); } @Test - public void testFrameStringsStringsParallel() { - runFrameReadWriteTest(schemaStrings, schemaStrings, true); + public void testFrameStringsStringsBinaryParallel() { + runFrameReadWriteTest(OutputInfo.BinaryBlockOutputInfo, schemaStrings, schemaStrings, true); } @Test - public void testFrameMixedStrings() { - runFrameReadWriteTest(schemaMixed, schemaStrings, false); + public void testFrameMixedStringsBinary() { + runFrameReadWriteTest(OutputInfo.BinaryBlockOutputInfo, schemaMixed, schemaStrings, false); } @Test - public void testFrameStringsMixedParallel() { - runFrameReadWriteTest(schemaStrings, schemaMixed, true); + public void testFrameStringsMixedBinaryParallel() { + runFrameReadWriteTest(OutputInfo.BinaryBlockOutputInfo, schemaStrings, schemaMixed, true); } @Test - public void testFrameMixedMixed() { - runFrameReadWriteTest(schemaMixed, schemaMixed, false); + public void testFrameMixedMixedBinary() { + runFrameReadWriteTest(OutputInfo.BinaryBlockOutputInfo, schemaMixed, schemaMixed, false); } @Test - public void testFrameMixedMixedParallel() { - runFrameReadWriteTest(schemaMixed, schemaMixed, true); + public void testFrameMixedMixedBinaryParallel() { + runFrameReadWriteTest(OutputInfo.BinaryBlockOutputInfo, schemaMixed, schemaMixed, true); } + @Test + public void testFrameStringsStringsTextCell() { + runFrameReadWriteTest(OutputInfo.TextCellOutputInfo, schemaStrings, schemaStrings, false); + } + + @Test + public void testFrameStringsStringsTextCellParallel() { + runFrameReadWriteTest(OutputInfo.TextCellOutputInfo, schemaStrings, schemaStrings, true); + } + + @Test + public void testFrameMixedStringsTextCell() { + runFrameReadWriteTest(OutputInfo.TextCellOutputInfo, schemaMixed, schemaStrings, false); + } + + @Test + public void testFrameStringsMixedTextCellParallel() { + runFrameReadWriteTest(OutputInfo.TextCellOutputInfo, schemaStrings, schemaMixed, true); + } + + @Test + public void testFrameMixedMixedTextCell() { + runFrameReadWriteTest(OutputInfo.TextCellOutputInfo, schemaMixed, schemaMixed, false); + } + + @Test + public void testFrameMixedMixedTextCellParallel() { + runFrameReadWriteTest(OutputInfo.TextCellOutputInfo, schemaMixed, schemaMixed, true); + } + + @Test + public void testFrameStringsStringsTextCSV() { + runFrameReadWriteTest(OutputInfo.CSVOutputInfo, schemaStrings, schemaStrings, false); + } + + @Test + public void testFrameStringsStringsTextCSVParallel() { + runFrameReadWriteTest(OutputInfo.CSVOutputInfo, schemaStrings, schemaStrings, true); + } + + @Test + public void testFrameMixedStringsTextCSV() { + runFrameReadWriteTest(OutputInfo.CSVOutputInfo, schemaMixed, schemaStrings, false); + } + + @Test + public void testFrameStringsMixedTextCSVParallel() { + runFrameReadWriteTest(OutputInfo.CSVOutputInfo, schemaStrings, schemaMixed, true); + } + + @Test + public void testFrameMixedMixedTextCSV() { + runFrameReadWriteTest(OutputInfo.CSVOutputInfo, schemaMixed, schemaMixed, false); + } + + @Test + public void testFrameMixedMixedTextCSVParallel() { + runFrameReadWriteTest(OutputInfo.CSVOutputInfo, schemaMixed, schemaMixed, true); + } /** * @@ -98,7 +157,7 @@ public class FrameReadWriteTest extends AutomatedTestBase * @param sparseM2 * @param instType */ - private void runFrameReadWriteTest( ValueType[] schema1, ValueType[] schema2, boolean parallel) + private void runFrameReadWriteTest( OutputInfo oinfo, ValueType[] schema1, ValueType[] schema2, boolean parallel) { boolean oldParText = CompilerConfig.FLAG_PARREADWRITE_TEXT; boolean oldParBin = CompilerConfig.FLAG_PARREADWRITE_BINARY; @@ -129,9 +188,7 @@ public class FrameReadWriteTest extends AutomatedTestBase fprop.setDelim(DELIMITER); fprop.setHeader(HEADER); - writeAndVerifyData(OutputInfo.TextCellOutputInfo, frame1, frame2, fprop); - writeAndVerifyData(OutputInfo.CSVOutputInfo, frame1, frame2, fprop); - writeAndVerifyData(OutputInfo.BinaryBlockOutputInfo, frame1, frame2, fprop); + writeAndVerifyData(oinfo, frame1, frame2, fprop); } catch(Exception ex) { ex.printStackTrace();
