Repository: incubator-systemml Updated Branches: refs/heads/master 6bbae619a -> dea42de1f
[SYSTEMML-630] New parallel csv/textcell frame writers, cleanup This patch adds, similar to the parallel binary block frame writer, parallel frame writers for csv and textcell. On a 1Mx1k (8GB binary, ~25-30GB in csv/text) scenario, multi-threaded write achieved the following improvements: * Textcell: 588s (single-threaded) -> 59s (multi-threaded) * CSV: 518s (single-threaded) -> 47s (multi-threaded) Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/fdf55181 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/fdf55181 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/fdf55181 Branch: refs/heads/master Commit: fdf551813572d6fd4f0d8076a525d4b1b3ab1b53 Parents: 6bbae61 Author: Matthias Boehm <[email protected]> Authored: Tue Jun 7 13:31:15 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Tue Jun 7 13:31:15 2016 -0700 ---------------------------------------------------------------------- .../io/FrameWriterBinaryBlockParallel.java | 7 +- .../sysml/runtime/io/FrameWriterFactory.java | 10 +- .../sysml/runtime/io/FrameWriterTextCSV.java | 62 ++++++--- .../runtime/io/FrameWriterTextCSVParallel.java | 133 +++++++++++++++++++ .../sysml/runtime/io/FrameWriterTextCell.java | 68 ++++++---- .../runtime/io/FrameWriterTextCellParallel.java | 128 ++++++++++++++++++ .../sysml/runtime/matrix/data/FrameBlock.java | 44 +++++- 7 files changed, 396 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/fdf55181/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java index f0b5bd1..2ef7692 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java @@ -56,13 +56,12 @@ public class FrameWriterBinaryBlockParallel extends FrameWriterBinaryBlock * @throws DMLRuntimeException */ protected void writeBinaryBlockFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen ) - throws IOException, DMLRuntimeException + throws IOException, DMLRuntimeException { //estimate output size and number of output blocks (min 1) int blen = ConfigurationManager.getBlocksize(); - int numPartFiles = (int)(OptimizerUtils.estimatePartitionedSizeExactSparsity(rlen, clen, blen, blen, rlen*clen) - / InfrastructureAnalyzer.getHDFSBlockSize()); - numPartFiles = Math.max(numPartFiles, 1); + int numPartFiles = Math.max((int)(OptimizerUtils.estimatePartitionedSizeExactSparsity(rlen, clen, blen, blen, rlen*clen) + / InfrastructureAnalyzer.getHDFSBlockSize()), 1); //determine degree of parallelism int numThreads = OptimizerUtils.getParallelBinaryWriteParallelism(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/fdf55181/src/main/java/org/apache/sysml/runtime/io/FrameWriterFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterFactory.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterFactory.java index a7f132f..c1021e6 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterFactory.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterFactory.java @@ -57,12 +57,18 @@ public class FrameWriterFactory FrameWriter writer = null; if( oinfo == OutputInfo.TextCellOutputInfo ) { - writer = new FrameWriterTextCell(); + if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_TEXTFORMATS) ) + writer = new FrameWriterTextCellParallel(); + else + writer = new FrameWriterTextCell(); } else if( oinfo == OutputInfo.CSVOutputInfo ) { if( props!=null && !(props instanceof CSVFileFormatProperties) ) throw new DMLRuntimeException("Wrong type of file format properties for CSV writer."); - writer = new FrameWriterTextCSV((CSVFileFormatProperties)props); + if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_TEXTFORMATS) ) + writer = new FrameWriterTextCSVParallel((CSVFileFormatProperties)props); + else + writer = new FrameWriterTextCSV((CSVFileFormatProperties)props); } else if( oinfo == OutputInfo.BinaryBlockOutputInfo ) { if( ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS) ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/fdf55181/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java index 455f44d..addf798 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java @@ -34,6 +34,7 @@ import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.util.MapReduceTool; /** + * Single-threaded frame text csv writer. * */ public class FrameWriterTextCSV extends FrameWriter @@ -58,22 +59,23 @@ public class FrameWriterTextCSV extends FrameWriter * @throws DMLRuntimeException */ @Override - public void writeFrameToHDFS(FrameBlock src, String fname, long rlen, long clen) + public final void writeFrameToHDFS(FrameBlock src, String fname, long rlen, long clen) throws IOException, DMLRuntimeException { - //validity check frame dimensions - if( src.getNumRows() != rlen || src.getNumColumns() != clen ) { - throw new IOException("Frame dimensions mismatch with metadata: "+src.getNumRows()+"x"+src.getNumColumns()+" vs "+rlen+"x"+clen+"."); - } - //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); Path path = new Path( fname ); //if the file already exists on HDFS, remove it. MapReduceTool.deleteFileIfExistOnHDFS( fname ); - - //core write + + //validity check frame dimensions + if( src.getNumRows() != rlen || src.getNumColumns() != clen ) { + throw new IOException("Frame dimensions mismatch with metadata: " + + src.getNumRows()+"x"+src.getNumColumns()+" vs "+rlen+"x"+clen+"."); + } + + //core write (sequential/parallel) writeCSVFrameToHDFS(path, job, src, rlen, clen, _props); } @@ -84,16 +86,36 @@ public class FrameWriterTextCSV extends FrameWriter * @param src * @param rlen * @param clen + * @param csvprops + * @throws IOException + */ + protected void writeCSVFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen, CSVFileFormatProperties csvprops ) + throws IOException + { + FileSystem fs = FileSystem.get(job); + + //sequential write to single text file + writeCSVFrameToFile(path, job, fs, src, 0, (int)rlen, csvprops); + } + + /** + * + * @param path + * @param job + * @param src + * @param rlen + * @param clen * @param props * @return * @throws IOException */ - protected void writeCSVFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen, CSVFileFormatProperties props ) + protected final void writeCSVFrameToFile( Path path, JobConf job, FileSystem fs, FrameBlock src, int rl, int ru, CSVFileFormatProperties props ) throws IOException { - FileSystem fs = FileSystem.get(job); - BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); - + //create buffered writer + BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); + int cols = src.getNumColumns(); + try { //for obj reuse and preventing repeated buffer re-allocations @@ -103,13 +125,13 @@ public class FrameWriterTextCSV extends FrameWriter String delim = props.getDelim(); // Write header line, if needed - if( props.hasHeader() ) + if( props.hasHeader() && rl==0 ) { //write row chunk-wise to prevent OOM on large number of columns - for( int bj=0; bj<clen; bj+=BLOCKSIZE_J ) { - for( int j=bj; j < Math.min(clen,bj+BLOCKSIZE_J); j++) { + for( int bj=0; bj<cols; bj+=BLOCKSIZE_J ) { + for( int j=bj; j < Math.min(cols,bj+BLOCKSIZE_J); j++) { sb.append("C"+ (j+1)); - if ( j < clen-1 ) + if ( j < cols-1 ) sb.append(delim); } br.write( sb.toString() ); @@ -121,15 +143,15 @@ public class FrameWriterTextCSV extends FrameWriter } // Write data lines - Iterator<String[]> iter = src.getStringRowIterator(); + Iterator<String[]> iter = src.getStringRowIterator(rl, ru); while( iter.hasNext() ) { //write row chunk-wise to prevent OOM on large number of columns String[] row = iter.next(); - for( int bj=0; bj<clen; bj+=BLOCKSIZE_J ) { - for( int j=bj; j<Math.min(clen,bj+BLOCKSIZE_J); j++ ) { + for( int bj=0; bj<cols; bj+=BLOCKSIZE_J ) { + for( int j=bj; j<Math.min(cols,bj+BLOCKSIZE_J); j++ ) { if(row[j] != null) sb.append(row[j]); - if( j != clen-1 ) + if( j != cols-1 ) sb.append(delim); } br.write( sb.toString() ); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/fdf55181/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java new file mode 100644 index 0000000..de492e2 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.io; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.sysml.conf.DMLConfig; +import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; +import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.OutputInfo; +import org.apache.sysml.runtime.util.MapReduceTool; + +/** + * Single-threaded frame text csv writer. + * + */ +public class FrameWriterTextCSVParallel extends FrameWriterTextCSV +{ + public FrameWriterTextCSVParallel( CSVFileFormatProperties props ) { + super(props); + } + + /** + * + * @param path + * @param job + * @param src + * @param rlen + * @param clen + * @param csvprops + * @throws IOException + */ + @Override + protected void writeCSVFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen, CSVFileFormatProperties csvprops ) + throws IOException + { + //estimate output size and number of output blocks (min 1) + int numPartFiles = Math.max((int)(OptimizerUtils.estimateSizeTextOutput(rlen, clen, rlen*clen, + OutputInfo.CSVOutputInfo) / InfrastructureAnalyzer.getHDFSBlockSize()), 1); + + //determine degree of parallelism + int numThreads = OptimizerUtils.getParallelTextWriteParallelism(); + numThreads = Math.min(numThreads, numPartFiles); + + //fall back to sequential write if dop is 1 (e.g., <128MB) in order to create single file + if( numThreads <= 1 ) { + super.writeCSVFrameToHDFS(path, job, src, rlen, clen, csvprops); + return; + } + + //create directory for concurrent tasks + MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); + FileSystem fs = FileSystem.get(job); + + //create and execute tasks + try + { + ExecutorService pool = Executors.newFixedThreadPool(numThreads); + ArrayList<WriteFileTask> tasks = new ArrayList<WriteFileTask>(); + int blklen = (int)Math.ceil((double)rlen / numThreads); + for(int i=0; i<numThreads & i*blklen<rlen; i++) { + Path newPath = new Path(path, String.format("0-m-%05d",i)); + tasks.add(new WriteFileTask(newPath, job, fs, src, i*blklen, (int)Math.min((i+1)*blklen, rlen), csvprops)); + } + + //wait until all tasks have been executed + List<Future<Object>> rt = pool.invokeAll(tasks); + pool.shutdown(); + + //check for exceptions + for( Future<Object> task : rt ) + task.get(); + } + catch (Exception e) { + throw new IOException("Failed parallel write of csv output.", e); + } + } + + private class WriteFileTask implements Callable<Object> + { + private Path _path = null; + private JobConf _job = null; + private FileSystem _fs = null; + private FrameBlock _src = null; + private int _rl = -1; + private int _ru = -1; + private CSVFileFormatProperties _csvprops = null; + + public WriteFileTask(Path path, JobConf job, FileSystem fs, FrameBlock src, int rl, int ru, CSVFileFormatProperties csvprops) { + _path = path; + _fs = fs; + _job = job; + _src = src; + _rl = rl; + _ru = ru; + _csvprops = csvprops; + } + + @Override + public Object call() throws Exception { + writeCSVFrameToFile(_path, _job, _fs, _src, _rl, _ru, _csvprops); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/fdf55181/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java index 8064ce5..38348ad 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java @@ -32,6 +32,10 @@ import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.util.MapReduceTool; +/** + * Single-threaded frame text cell writer. + * + */ public class FrameWriterTextCell extends FrameWriter { /** @@ -42,22 +46,23 @@ public class FrameWriterTextCell extends FrameWriter * @throws DMLRuntimeException */ @Override - public void writeFrameToHDFS( FrameBlock src, String fname, long rlen, long clen ) + public final void writeFrameToHDFS( FrameBlock src, String fname, long rlen, long clen ) throws IOException, DMLRuntimeException { - //validity check frame dimensions - if( src.getNumRows() != rlen || src.getNumColumns() != clen ) { - throw new IOException("Frame dimensions mismatch with metadata: "+src.getNumRows()+"x"+src.getNumColumns()+" vs "+rlen+"x"+clen+"."); - } - //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); Path path = new Path( fname ); //if the file already exists on HDFS, remove it. MapReduceTool.deleteFileIfExistOnHDFS( fname ); - - //core write + + //validity check frame dimensions + if( src.getNumRows() != rlen || src.getNumColumns() != clen ) { + throw new IOException("Frame dimensions mismatch with metadata: " + + src.getNumRows()+"x"+src.getNumColumns()+" vs "+rlen+"x"+clen+"."); + } + + //core write (sequential/parallel) writeTextCellFrameToHDFS(path, job, src, src.getNumRows(), src.getNumColumns()); } @@ -68,38 +73,51 @@ public class FrameWriterTextCell extends FrameWriter * @param src * @param rlen * @param clen - * @param brlen - * @param bclen * @throws IOException */ protected void writeTextCellFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen ) throws IOException { - boolean entriesWritten = false; FileSystem fs = FileSystem.get(job); - BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); - int rows = src.getNumRows(); - int cols = src.getNumColumns(); + //sequential write to single text file + writeTextCellFrameToFile(path, job, fs, src, 0, (int)rlen); + } + + /** + * Internal primitive to write a row range of a frame to a single text file, + * which is used for both single- and multi-threaded writers (for consistency). + * + * @param path + * @param job + * @param fs + * @param src + * @param rl + * @param ru + * @throws IOException + */ + protected final void writeTextCellFrameToFile( Path path, JobConf job, FileSystem fs, FrameBlock src, int rl, int ru ) + throws IOException + { + boolean entriesWritten = false; + int cols = src.getNumColumns(); + + //create buffered writer + BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); - //bound check per block - if( rows > rlen || cols > clen ) { - throw new IOException("Frame block [1:"+rows+",1:"+cols+"] " + - "out of overall frame range [1:"+rlen+",1:"+clen+"]."); - } - try { //for obj reuse and preventing repeated buffer re-allocations StringBuilder sb = new StringBuilder(); - Iterator<String[]> iter = src.getStringRowIterator(); - for( int i=0; iter.hasNext(); i++ ) { //for all rows + //write frame row range to output + Iterator<String[]> iter = src.getStringRowIterator(rl, ru); + for( int i=rl; iter.hasNext(); i++ ) { //for all rows String rowIndex = Integer.toString(i+1); String[] row = iter.next(); for( int j=0; j<cols; j++ ) { if( row[j] != null ) { - sb.append(rowIndex); + sb.append( rowIndex ); sb.append(' '); sb.append( j+1 ); sb.append(' '); @@ -119,6 +137,6 @@ public class FrameWriterTextCell extends FrameWriter } finally { IOUtilFunctions.closeSilently(br); - } - } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/fdf55181/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java new file mode 100644 index 0000000..82a139d --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.io; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.sysml.conf.DMLConfig; +import org.apache.sysml.hops.OptimizerUtils; +import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.OutputInfo; +import org.apache.sysml.runtime.util.MapReduceTool; + +/** + * Multi-threaded frame text cell writer. + * + */ +public class FrameWriterTextCellParallel extends FrameWriterTextCell +{ + /** + * + * @param path + * @param job + * @param src + * @param rlen + * @param clen + * @throws IOException + */ + @Override + protected void writeTextCellFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen ) + throws IOException + { + //estimate output size and number of output blocks (min 1) + int numPartFiles = Math.max((int)(OptimizerUtils.estimateSizeTextOutput(rlen, clen, rlen*clen, + OutputInfo.TextCellOutputInfo) / InfrastructureAnalyzer.getHDFSBlockSize()), 1); + + //determine degree of parallelism + int numThreads = OptimizerUtils.getParallelTextWriteParallelism(); + numThreads = Math.min(numThreads, numPartFiles); + + //fall back to sequential write if dop is 1 (e.g., <128MB) in order to create single file + if( numThreads <= 1 ) { + super.writeTextCellFrameToHDFS(path, job, src, rlen, clen); + return; + } + + //create directory for concurrent tasks + MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); + FileSystem fs = FileSystem.get(job); + + //create and execute tasks + try + { + ExecutorService pool = Executors.newFixedThreadPool(numThreads); + ArrayList<WriteFileTask> tasks = new ArrayList<WriteFileTask>(); + int blklen = (int)Math.ceil((double)rlen / numThreads); + for(int i=0; i<numThreads & i*blklen<rlen; i++) { + Path newPath = new Path(path, String.format("0-m-%05d",i)); + tasks.add(new WriteFileTask(newPath, job, fs, src, i*blklen, (int)Math.min((i+1)*blklen, rlen))); + } + + //wait until all tasks have been executed + List<Future<Object>> rt = pool.invokeAll(tasks); + pool.shutdown(); + + //check for exceptions + for( Future<Object> task : rt ) + task.get(); + } + catch (Exception e) { + throw new IOException("Failed parallel write of text output.", e); + } + } + + /** + * + */ + private class WriteFileTask implements Callable<Object> + { + private Path _path = null; + private JobConf _job = null; + private FileSystem _fs = null; + private FrameBlock _src = null; + private int _rl = -1; + private int _ru = -1; + + public WriteFileTask(Path path, JobConf job, FileSystem fs, FrameBlock src, int rl, int ru) { + _path = path; + _fs = fs; + _job = job; + _src = src; + _rl = rl; + _ru = ru; + } + + @Override + public Object call() throws Exception { + writeTextCellFrameToFile(_path, _job, _fs, _src, _rl, _ru); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/fdf55181/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java index 268ba44..b851704 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java @@ -348,7 +348,19 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable * @return */ public Iterator<String[]> getStringRowIterator() { - return new StringRowIterator(); + return new StringRowIterator(0, _numRows); + } + + /** + * Get a row iterator over the frame where all fields are encoded + * as strings independent of their value types. + * + * @param rl + * @param ru + * @return + */ + public Iterator<String[]> getStringRowIterator(int rl, int ru) { + return new StringRowIterator(rl, ru); } /** @@ -358,7 +370,19 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable * @return */ public Iterator<Object[]> getObjectRowIterator() { - return new ObjectRowIterator(); + return new ObjectRowIterator(0, _numRows); + } + + /** + * Get a row iterator over the frame where all fields are encoded + * as boxed objects according to their value types. + * + * @param rl + * @param ru + * @return + */ + public Iterator<Object[]> getObjectRowIterator(int rl, int ru) { + return new ObjectRowIterator(rl, ru); } /////// @@ -744,15 +768,17 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable private abstract class RowIterator<T> implements Iterator<T[]> { protected T[] _curRow = null; protected int _curPos = -1; + protected int _maxPos = -1; - protected RowIterator() { - _curPos = 0; + protected RowIterator(int rl, int ru) { + _curPos = rl; + _maxPos = ru; _curRow = createRow(getNumColumns()); } @Override public boolean hasNext() { - return (_curPos < _numRows); + return (_curPos < _maxPos); } @Override @@ -767,6 +793,10 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable * */ private class StringRowIterator extends RowIterator<String> { + public StringRowIterator(int rl, int ru) { + super(rl, ru); + } + @Override protected String[] createRow(int size) { return new String[size]; @@ -787,6 +817,10 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable * */ private class ObjectRowIterator extends RowIterator<Object> { + public ObjectRowIterator(int rl, int ru) { + super(rl, ru); + } + @Override protected Object[] createRow(int size) { return new Object[size];
