Repository: systemml Updated Branches: refs/heads/master 7602af94f -> 717831daf
[SYSTEMML-1697] Fix cp multi-part binary read from object stores Our CP binary readers directly read from sequence files instead of record readers. When reading from object stores this creates problems for multi-part files because the parent directory does not physically exist. Furthermore, the listing of part files also contains the mtd file because it shares the common directory prefix. This patch makes the binary readers more robust for such file systems that are not compliant with the HDFS file system API. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/717831da Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/717831da Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/717831da Branch: refs/heads/master Commit: 717831daff16d4c87381e6b8d4f48baa9c2922b4 Parents: 7602af9 Author: Matthias Boehm <[email protected]> Authored: Wed Jun 14 20:18:44 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Wed Jun 14 20:22:44 2017 -0700 ---------------------------------------------------------------------- .../parfor/DataPartitionerLocal.java | 7 ++-- .../controlprogram/parfor/RemoteDPParForMR.java | 3 +- .../controlprogram/parfor/RemoteParForMR.java | 3 +- .../parfor/ResultMergeLocalFile.java | 7 ++-- .../ParameterizedBuiltinCPFileInstruction.java | 5 +-- .../apache/sysml/runtime/io/FrameReader.java | 24 ------------ .../runtime/io/FrameReaderBinaryBlock.java | 4 +- .../io/FrameReaderBinaryBlockParallel.java | 2 +- .../sysml/runtime/io/IOUtilFunctions.java | 39 ++++++++++++++++++++ .../apache/sysml/runtime/io/MatrixReader.java | 24 ------------ .../sysml/runtime/io/ReaderBinaryBlock.java | 4 +- .../runtime/io/ReaderBinaryBlockParallel.java | 2 +- .../sysml/runtime/io/ReaderBinaryCell.java | 2 +- .../sysml/runtime/util/MapReduceTool.java | 4 +- 14 files changed, 59 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java index f0b5674..b57f050 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java @@ -48,7 +48,6 @@ import org.apache.sysml.runtime.controlprogram.parfor.util.Cell; import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence; import org.apache.sysml.runtime.controlprogram.parfor.util.StagingFileUtils; import org.apache.sysml.runtime.io.IOUtilFunctions; -import org.apache.sysml.runtime.io.MatrixReader; import org.apache.sysml.runtime.matrix.data.IJV; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; @@ -246,7 +245,7 @@ public class DataPartitionerLocal extends DataPartitioner MatrixIndexes key = new MatrixIndexes(); MatrixCell value = new MatrixCell(); - for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) ) + for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) { SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job); try @@ -333,7 +332,7 @@ public class DataPartitionerLocal extends DataPartitioner MatrixIndexes key = new MatrixIndexes(); MatrixBlock value = new MatrixBlock(); - for(Path lpath : MatrixReader.getSequenceFilePaths(fs, path) ) + for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) { SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job); try @@ -408,7 +407,7 @@ public class DataPartitionerLocal extends DataPartitioner LinkedList<Cell> buffer = new LinkedList<Cell>(); - for(Path lpath : MatrixReader.getSequenceFilePaths(fs, path) ) + for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) { SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job); try http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java index 60d23c6..2bccdba 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java @@ -52,7 +52,6 @@ import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock; import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell; import org.apache.sysml.runtime.instructions.cp.Data; import org.apache.sysml.runtime.io.IOUtilFunctions; -import org.apache.sysml.runtime.io.MatrixReader; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; @@ -253,7 +252,7 @@ public class RemoteDPParForMR Text value = new Text(); //serialized var header (incl filename) int countAll = 0; - for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) ) + for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) { SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job); try http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java index 67c3368..1a3b8c3 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java @@ -50,7 +50,6 @@ import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat; import org.apache.sysml.runtime.instructions.cp.Data; import org.apache.sysml.runtime.io.IOUtilFunctions; -import org.apache.sysml.runtime.io.MatrixReader; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; @@ -264,7 +263,7 @@ public class RemoteParForMR Text value = new Text(); //serialized var header (incl filename) int countAll = 0; - for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) ) + for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) { SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job); try http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java index 7dae670..9b274be 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java @@ -50,7 +50,6 @@ import org.apache.sysml.runtime.controlprogram.parfor.util.Cell; import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence; import org.apache.sysml.runtime.controlprogram.parfor.util.StagingFileUtils; import org.apache.sysml.runtime.io.IOUtilFunctions; -import org.apache.sysml.runtime.io.MatrixReader; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.MatrixFormatMetaData; import org.apache.sysml.runtime.matrix.data.IJV; @@ -340,7 +339,7 @@ public class ResultMergeLocalFile extends ResultMerge JobConf tmpJob = new JobConf(ConfigurationManager.getCachedJobConf()); Path tmpPath = new Path(in.getFileName()); - for(Path lpath : MatrixReader.getSequenceFilePaths(fs, tmpPath) ) + for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, tmpPath) ) { SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,tmpJob); try @@ -477,7 +476,7 @@ public class ResultMergeLocalFile extends ResultMerge Path tmpPath = new Path(mo.getFileName()); FileSystem fs = IOUtilFunctions.getFileSystem(tmpPath, tmpJob); - for(Path lpath : MatrixReader.getSequenceFilePaths(fs, tmpPath)) + for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, tmpPath)) { SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,tmpJob); try @@ -576,7 +575,7 @@ public class ResultMergeLocalFile extends ResultMerge int brlen = mc.getRowsPerBlock(); int bclen = mc.getColsPerBlock(); - for(Path lpath: MatrixReader.getSequenceFilePaths(fs, path)) + for(Path lpath: IOUtilFunctions.getSequenceFilePaths(fs, path)) { SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job); try http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java index 6a7127f..3046c7e 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java @@ -57,7 +57,6 @@ import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.cp.ParameterizedBuiltinCPInstruction; import org.apache.sysml.runtime.io.IOUtilFunctions; -import org.apache.sysml.runtime.io.MatrixReader; import org.apache.sysml.runtime.io.MatrixWriter; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.MatrixFormatMetaData; @@ -330,7 +329,7 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC MatrixIndexes key = new MatrixIndexes(); MatrixCell value = new MatrixCell(); - for(Path lpath: MatrixReader.getSequenceFilePaths(fs, path)) + for(Path lpath: IOUtilFunctions.getSequenceFilePaths(fs, path)) { SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job); try @@ -387,7 +386,7 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC MatrixBlock value = new MatrixBlock(); boolean diagBlocks = true; - for(Path lpath : MatrixReader.getSequenceFilePaths(fs, path)) + for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path)) { SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job); http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/FrameReader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java index 03271d8..584ff4d 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java @@ -22,9 +22,7 @@ package org.apache.sysml.runtime.io; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; -import java.util.LinkedList; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.sysml.hops.OptimizerUtils; @@ -86,28 +84,6 @@ public abstract class FrameReader return (clen < 0) ? new String[0] : FrameBlock.createColNames((int)clen); } - - public static Path[] getSequenceFilePaths( FileSystem fs, Path file ) - throws IOException - { - Path[] ret = null; - - if( fs.isDirectory(file) ) - { - LinkedList<Path> tmp = new LinkedList<Path>(); - FileStatus[] dStatus = fs.listStatus(file); - for( FileStatus fdStatus : dStatus ) - if( !fdStatus.getPath().getName().startsWith("_") ) //skip internal files - tmp.add(fdStatus.getPath()); - ret = tmp.toArray(new Path[0]); - } - else - { - ret = new Path[]{ file }; - } - - return ret; - } /** * NOTE: mallocDense controls if the output matrix blocks is fully allocated, this can be redundant http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java index e08aa96..7bd2c00 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java @@ -73,7 +73,7 @@ public class FrameReaderBinaryBlock extends FrameReader throws IOException, DMLRuntimeException { //sequential read from sequence files - for( Path lpath : getSequenceFilePaths(fs, path) ) //1..N files + for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) //1..N files readBinaryBlockFrameFromSequenceFile(lpath, job, fs, dest); } @@ -135,7 +135,7 @@ public class FrameReaderBinaryBlock extends FrameReader FrameBlock value = new FrameBlock(); //read first block from first file - Path lpath = getSequenceFilePaths(fs, path)[0]; + Path lpath = IOUtilFunctions.getSequenceFilePaths(fs, path)[0]; SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job); try { reader.next(key, value); http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java index 7b2e790..45718f9 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java @@ -52,7 +52,7 @@ public class FrameReaderBinaryBlockParallel extends FrameReaderBinaryBlock //create read tasks for all files ExecutorService pool = Executors.newFixedThreadPool(numThreads); ArrayList<ReadFileTask> tasks = new ArrayList<ReadFileTask>(); - for( Path lpath : getSequenceFilePaths(fs, path) ) + for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) tasks.add(new ReadFileTask(lpath, job, fs, dest)); //wait until all tasks have been executed http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java index 44d9aee..12b7438 100644 --- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java @@ -27,12 +27,14 @@ import java.io.StringReader; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.LinkedList; import org.apache.commons.io.input.ReaderInputStream; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; @@ -78,6 +80,14 @@ public class IOUtilFunctions || scheme1.equals(scheme2); } + public static boolean isObjectStoreFileScheme(Path path) { + if( path == null || path.toUri() == null || path.toUri().getScheme() == null ) + return false; + String scheme = path.toUri().getScheme(); + //capture multiple alternatives s3, s3n, s3a, swift, swift2d + return scheme.startsWith("s3") || scheme.startsWith("swift"); + } + public static void closeSilently( Closeable io ) { try { if( io != null ) @@ -436,6 +446,35 @@ public class IOUtilFunctions return ncol; } + public static Path[] getSequenceFilePaths( FileSystem fs, Path file ) + throws IOException + { + Path[] ret = null; + + //Note on object stores: Since the object store file system implementations + //only emulate a file system, the directory of a multi-part file does not + //exist physically and hence the isDirectory call returns false. Furthermore, + //listStatus call returns all files with the given directory as prefix, which + //includes the mtd file which needs to be ignored accordingly. + + if( fs.isDirectory(file) + || IOUtilFunctions.isObjectStoreFileScheme(file) ) + { + LinkedList<Path> tmp = new LinkedList<Path>(); + FileStatus[] dStatus = fs.listStatus(file); + for( FileStatus fdStatus : dStatus ) + if( !fdStatus.getPath().getName().startsWith("_") //skip internal files + && !fdStatus.getPath().equals(file.toString()+".mtd") ) //mtd file + tmp.add(fdStatus.getPath()); + ret = tmp.toArray(new Path[0]); + } + else { + ret = new Path[]{ file }; + } + + return ret; + } + /** * Delete the CRC files from the local file system associated with a * particular file and its metadata file. http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java index dbd1602..befccfe 100644 --- a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java +++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java @@ -23,14 +23,12 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.sysml.hops.OptimizerUtils; @@ -58,28 +56,6 @@ public abstract class MatrixReader public abstract MatrixBlock readMatrixFromInputStream( InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz ) throws IOException, DMLRuntimeException; - public static Path[] getSequenceFilePaths( FileSystem fs, Path file ) - throws IOException - { - Path[] ret = null; - - if( fs.isDirectory(file) ) - { - LinkedList<Path> tmp = new LinkedList<Path>(); - FileStatus[] dStatus = fs.listStatus(file); - for( FileStatus fdStatus : dStatus ) - if( !fdStatus.getPath().getName().startsWith("_") ) //skip internal files - tmp.add(fdStatus.getPath()); - ret = tmp.toArray(new Path[0]); - } - else - { - ret = new Path[]{ file }; - } - - return ret; - } - /** * NOTE: mallocDense controls if the output matrix blocks is fully allocated, this can be redundant * if binary block read and single block. http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java index 16241c9..0bca17d 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java @@ -138,7 +138,7 @@ public class ReaderBinaryBlock extends MatrixReader if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION ) MRJobConfiguration.addBinaryBlockSerializationFramework( job ); - for( Path lpath : getSequenceFilePaths(fs, path) ) //1..N files + for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) //1..N files { //directly read from sequence files (individual partfiles) SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job); @@ -207,7 +207,7 @@ public class ReaderBinaryBlock extends MatrixReader if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION ) MRJobConfiguration.addBinaryBlockSerializationFramework( job ); - for( Path lpath : getSequenceFilePaths(fs, path) ) //1..N files + for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) //1..N files { //directly read from sequence files (individual partfiles) SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job); http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java index 2afdb0e..e7114d8 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java @@ -89,7 +89,7 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock //create read tasks for all files ExecutorService pool = Executors.newFixedThreadPool(_numThreads); ArrayList<ReadFileTask> tasks = new ArrayList<ReadFileTask>(); - for( Path lpath : getSequenceFilePaths(fs, path) ){ + for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ){ ReadFileTask t = new ReadFileTask(lpath, job, fs, dest, rlen, clen, brlen, bclen); tasks.add(t); } http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java index 2bdc25e..00a1301 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java @@ -79,7 +79,7 @@ public class ReaderBinaryCell extends MatrixReader try { - for( Path lpath : getSequenceFilePaths(fs,path) ) //1..N files + for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs,path) ) //1..N files { //directly read from sequence files (individual partfiles) SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job); http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java index 5bdccfd..be6e177 100644 --- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java +++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java @@ -186,7 +186,9 @@ public class MapReduceTool public static boolean isFileEmpty(FileSystem fs, Path dir) throws IOException { FileStatus fstat = fs.getFileStatus(dir); - if (fstat.isDirectory()) { + if( fstat.isDirectory() + || IOUtilFunctions.isObjectStoreFileScheme(dir) ) + { // it is a directory FileStatus[] stats = fs.listStatus(dir); if (stats != null) {
