Repository: systemml
Updated Branches:
  refs/heads/master 7602af94f -> 717831daf


[SYSTEMML-1697] Fix cp multi-part binary read from object stores

Our CP binary readers directly read from sequence files instead of
record readers. When reading from object stores this creates problems
for multi-part files because the parent directory does not physically
exist. Furthermore, the listing of part files also contains the mtd file
because it shares the common directory prefix. This patch makes the
binary readers more robust for such file systems that are not compliant
with the HDFS file system API.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/717831da
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/717831da
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/717831da

Branch: refs/heads/master
Commit: 717831daff16d4c87381e6b8d4f48baa9c2922b4
Parents: 7602af9
Author: Matthias Boehm <[email protected]>
Authored: Wed Jun 14 20:18:44 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Wed Jun 14 20:22:44 2017 -0700

----------------------------------------------------------------------
 .../parfor/DataPartitionerLocal.java            |  7 ++--
 .../controlprogram/parfor/RemoteDPParForMR.java |  3 +-
 .../controlprogram/parfor/RemoteParForMR.java   |  3 +-
 .../parfor/ResultMergeLocalFile.java            |  7 ++--
 .../ParameterizedBuiltinCPFileInstruction.java  |  5 +--
 .../apache/sysml/runtime/io/FrameReader.java    | 24 ------------
 .../runtime/io/FrameReaderBinaryBlock.java      |  4 +-
 .../io/FrameReaderBinaryBlockParallel.java      |  2 +-
 .../sysml/runtime/io/IOUtilFunctions.java       | 39 ++++++++++++++++++++
 .../apache/sysml/runtime/io/MatrixReader.java   | 24 ------------
 .../sysml/runtime/io/ReaderBinaryBlock.java     |  4 +-
 .../runtime/io/ReaderBinaryBlockParallel.java   |  2 +-
 .../sysml/runtime/io/ReaderBinaryCell.java      |  2 +-
 .../sysml/runtime/util/MapReduceTool.java       |  4 +-
 14 files changed, 59 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
index f0b5674..b57f050 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
@@ -48,7 +48,6 @@ import 
org.apache.sysml.runtime.controlprogram.parfor.util.Cell;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysml.runtime.controlprogram.parfor.util.StagingFileUtils;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.io.MatrixReader;
 import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -246,7 +245,7 @@ public class DataPartitionerLocal extends DataPartitioner
                        MatrixIndexes key = new MatrixIndexes();
                        MatrixCell value = new MatrixCell();
        
-                       for( Path lpath : MatrixReader.getSequenceFilePaths(fs, 
path) )
+                       for( Path lpath : 
IOUtilFunctions.getSequenceFilePaths(fs, path) )
                        {
                                SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,job);
                                try
@@ -333,7 +332,7 @@ public class DataPartitionerLocal extends DataPartitioner
                        MatrixIndexes key = new MatrixIndexes(); 
                        MatrixBlock value = new MatrixBlock();
                        
-                       for(Path lpath : MatrixReader.getSequenceFilePaths(fs, 
path) )
+                       for(Path lpath : 
IOUtilFunctions.getSequenceFilePaths(fs, path) )
                        {
                                SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,job);
                                try
@@ -408,7 +407,7 @@ public class DataPartitionerLocal extends DataPartitioner
                        
                        LinkedList<Cell> buffer = new LinkedList<Cell>();
                        
-                       for(Path lpath : MatrixReader.getSequenceFilePaths(fs, 
path) )
+                       for(Path lpath : 
IOUtilFunctions.getSequenceFilePaths(fs, path) )
                        {
                                SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,job);
                                try

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
index 60d23c6..2bccdba 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
@@ -52,7 +52,6 @@ import 
org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
 import org.apache.sysml.runtime.instructions.cp.Data;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.io.MatrixReader;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
@@ -253,7 +252,7 @@ public class RemoteDPParForMR
                Text value = new Text();               //serialized var header 
(incl filename)
                
                int countAll = 0;
-               for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
+               for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, 
path) )
                {
                        SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,job);
                        try

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
index 67c3368..1a3b8c3 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
@@ -50,7 +50,6 @@ import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyze
 import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat;
 import org.apache.sysml.runtime.instructions.cp.Data;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.io.MatrixReader;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
@@ -264,7 +263,7 @@ public class RemoteParForMR
                Text value = new Text();               //serialized var header 
(incl filename)
                
                int countAll = 0;
-               for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
+               for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, 
path) )
                {
                        SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,job);
                        try

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
index 7dae670..9b274be 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
@@ -50,7 +50,6 @@ import 
org.apache.sysml.runtime.controlprogram.parfor.util.Cell;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysml.runtime.controlprogram.parfor.util.StagingFileUtils;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.io.MatrixReader;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
 import org.apache.sysml.runtime.matrix.data.IJV;
@@ -340,7 +339,7 @@ public class ResultMergeLocalFile extends ResultMerge
                                        JobConf tmpJob = new 
JobConf(ConfigurationManager.getCachedJobConf());
                                        Path tmpPath = new 
Path(in.getFileName());
                                        
-                                       for(Path lpath : 
MatrixReader.getSequenceFilePaths(fs, tmpPath) )
+                                       for(Path lpath : 
IOUtilFunctions.getSequenceFilePaths(fs, tmpPath) )
                                        {
                                                SequenceFile.Reader reader = 
new SequenceFile.Reader(fs,lpath,tmpJob);
                                                try
@@ -477,7 +476,7 @@ public class ResultMergeLocalFile extends ResultMerge
                Path tmpPath = new Path(mo.getFileName());
                FileSystem fs = IOUtilFunctions.getFileSystem(tmpPath, tmpJob);
                
-               for(Path lpath : MatrixReader.getSequenceFilePaths(fs, tmpPath))
+               for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, 
tmpPath))
                {
                        SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,tmpJob);
                        try
@@ -576,7 +575,7 @@ public class ResultMergeLocalFile extends ResultMerge
                int brlen = mc.getRowsPerBlock();
                int bclen = mc.getColsPerBlock();
                
-               for(Path lpath: MatrixReader.getSequenceFilePaths(fs, path))
+               for(Path lpath: IOUtilFunctions.getSequenceFilePaths(fs, path))
                {
                        SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,job);
                        try

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
index 6a7127f..3046c7e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
@@ -57,7 +57,6 @@ import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import 
org.apache.sysml.runtime.instructions.cp.ParameterizedBuiltinCPInstruction;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.io.MatrixReader;
 import org.apache.sysml.runtime.io.MatrixWriter;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
@@ -330,7 +329,7 @@ public class ParameterizedBuiltinCPFileInstruction extends 
ParameterizedBuiltinC
                        MatrixIndexes key = new MatrixIndexes();
                        MatrixCell value = new MatrixCell();
 
-                       for(Path lpath: MatrixReader.getSequenceFilePaths(fs, 
path))
+                       for(Path lpath: 
IOUtilFunctions.getSequenceFilePaths(fs, path))
                        {
                                SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,job);
                                try
@@ -387,7 +386,7 @@ public class ParameterizedBuiltinCPFileInstruction extends 
ParameterizedBuiltinC
                        MatrixBlock value = new MatrixBlock();
                        boolean diagBlocks = true;
                        
-                       for(Path lpath : MatrixReader.getSequenceFilePaths(fs, 
path))
+                       for(Path lpath : 
IOUtilFunctions.getSequenceFilePaths(fs, path))
                        {
                                SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,job);
                                

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
index 03271d8..584ff4d 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
@@ -22,9 +22,7 @@ package org.apache.sysml.runtime.io;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.LinkedList;
 
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.sysml.hops.OptimizerUtils;
@@ -86,28 +84,6 @@ public abstract class FrameReader
                return (clen < 0) ? new String[0] : 
                        FrameBlock.createColNames((int)clen);
        }
-
-       public static Path[] getSequenceFilePaths( FileSystem fs, Path file ) 
-               throws IOException
-       {
-               Path[] ret = null;
-               
-               if( fs.isDirectory(file) )
-               {
-                       LinkedList<Path> tmp = new LinkedList<Path>();
-                       FileStatus[] dStatus = fs.listStatus(file);
-                       for( FileStatus fdStatus : dStatus )
-                               if( 
!fdStatus.getPath().getName().startsWith("_") ) //skip internal files
-                                       tmp.add(fdStatus.getPath());
-                       ret = tmp.toArray(new Path[0]);
-               }
-               else
-               {
-                       ret = new Path[]{ file };
-               }
-               
-               return ret;
-       }
        
        /**
         * NOTE: mallocDense controls if the output matrix blocks is fully 
allocated, this can be redundant

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
index e08aa96..7bd2c00 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
@@ -73,7 +73,7 @@ public class FrameReaderBinaryBlock extends FrameReader
                throws IOException, DMLRuntimeException
        {
                //sequential read from sequence files
-               for( Path lpath : getSequenceFilePaths(fs, path) ) //1..N files 
+               for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, 
path) ) //1..N files 
                        readBinaryBlockFrameFromSequenceFile(lpath, job, fs, 
dest);
        }
 
@@ -135,7 +135,7 @@ public class FrameReaderBinaryBlock extends FrameReader
                FrameBlock value = new FrameBlock();
                
                //read first block from first file
-               Path lpath = getSequenceFilePaths(fs, path)[0];  
+               Path lpath = IOUtilFunctions.getSequenceFilePaths(fs, path)[0]; 
 
                SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,job);             
                try {
                        reader.next(key, value);

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
index 7b2e790..45718f9 100644
--- 
a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
+++ 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
@@ -52,7 +52,7 @@ public class FrameReaderBinaryBlockParallel extends 
FrameReaderBinaryBlock
                        //create read tasks for all files
                        ExecutorService pool = 
Executors.newFixedThreadPool(numThreads);
                        ArrayList<ReadFileTask> tasks = new 
ArrayList<ReadFileTask>();
-                       for( Path lpath : getSequenceFilePaths(fs, path) )
+                       for( Path lpath : 
IOUtilFunctions.getSequenceFilePaths(fs, path) )
                                tasks.add(new ReadFileTask(lpath, job, fs, 
dest));
 
                        //wait until all tasks have been executed

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java 
b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
index 44d9aee..12b7438 100644
--- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
@@ -27,12 +27,14 @@ import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.LinkedList;
 
 import org.apache.commons.io.input.ReaderInputStream;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -78,6 +80,14 @@ public class IOUtilFunctions
                        || scheme1.equals(scheme2);
        }
        
+       public static boolean isObjectStoreFileScheme(Path path) {
+               if( path == null || path.toUri() == null || 
path.toUri().getScheme() == null )
+                       return false;
+               String scheme = path.toUri().getScheme();
+               //capture multiple alternatives s3, s3n, s3a, swift, swift2d
+               return scheme.startsWith("s3") || scheme.startsWith("swift");
+       }
+       
        public static void closeSilently( Closeable io ) {
                try {
                        if( io != null )
@@ -436,6 +446,35 @@ public class IOUtilFunctions
                return ncol;
        }
 
+       public static Path[] getSequenceFilePaths( FileSystem fs, Path file ) 
+               throws IOException
+       {
+               Path[] ret = null;
+               
+               //Note on object stores: Since the object store file system 
implementations 
+               //only emulate a file system, the directory of a multi-part 
file does not
+               //exist physically and hence the isDirectory call returns 
false. Furthermore,
+               //listStatus call returns all files with the given directory as 
prefix, which
+               //includes the mtd file which needs to be ignored accordingly.
+               
+               if( fs.isDirectory(file) 
+                       || IOUtilFunctions.isObjectStoreFileScheme(file) )
+               {
+                       LinkedList<Path> tmp = new LinkedList<Path>();
+                       FileStatus[] dStatus = fs.listStatus(file);
+                       for( FileStatus fdStatus : dStatus )
+                               if( 
!fdStatus.getPath().getName().startsWith("_") //skip internal files
+                                       && 
!fdStatus.getPath().equals(file.toString()+".mtd") )  //mtd file
+                                       tmp.add(fdStatus.getPath());
+                       ret = tmp.toArray(new Path[0]);
+               }
+               else {
+                       ret = new Path[]{ file };
+               }
+               
+               return ret;
+       }
+       
        /**
         * Delete the CRC files from the local file system associated with a
         * particular file and its metadata file.

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java 
b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
index dbd1602..befccfe 100644
--- a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
@@ -23,14 +23,12 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.sysml.hops.OptimizerUtils;
@@ -58,28 +56,6 @@ public abstract class MatrixReader
        public abstract MatrixBlock readMatrixFromInputStream( InputStream is, 
long rlen, long clen, int brlen, int bclen, long estnnz )
                        throws IOException, DMLRuntimeException;
        
-       public static Path[] getSequenceFilePaths( FileSystem fs, Path file ) 
-               throws IOException
-       {
-               Path[] ret = null;
-               
-               if( fs.isDirectory(file) )
-               {
-                       LinkedList<Path> tmp = new LinkedList<Path>();
-                       FileStatus[] dStatus = fs.listStatus(file);
-                       for( FileStatus fdStatus : dStatus )
-                               if( 
!fdStatus.getPath().getName().startsWith("_") ) //skip internal files
-                                       tmp.add(fdStatus.getPath());
-                       ret = tmp.toArray(new Path[0]);
-               }
-               else
-               {
-                       ret = new Path[]{ file };
-               }
-               
-               return ret;
-       }
-       
        /**
         * NOTE: mallocDense controls if the output matrix blocks is fully 
allocated, this can be redundant
         * if binary block read and single block. 

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
index 16241c9..0bca17d 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
@@ -138,7 +138,7 @@ public class ReaderBinaryBlock extends MatrixReader
                if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
                        
MRJobConfiguration.addBinaryBlockSerializationFramework( job );
                
-               for( Path lpath : getSequenceFilePaths(fs, path) ) //1..N files 
+               for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, 
path) ) //1..N files 
                {
                        //directly read from sequence files (individual 
partfiles)
                        SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,job);
@@ -207,7 +207,7 @@ public class ReaderBinaryBlock extends MatrixReader
                if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
                        
MRJobConfiguration.addBinaryBlockSerializationFramework( job );
                
-               for( Path lpath : getSequenceFilePaths(fs, path) ) //1..N files 
+               for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, 
path) ) //1..N files 
                {
                        //directly read from sequence files (individual 
partfiles)
                        SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,job);

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
index 2afdb0e..e7114d8 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
@@ -89,7 +89,7 @@ public class ReaderBinaryBlockParallel extends 
ReaderBinaryBlock
                        //create read tasks for all files
                        ExecutorService pool = 
Executors.newFixedThreadPool(_numThreads);
                        ArrayList<ReadFileTask> tasks = new 
ArrayList<ReadFileTask>();
-                       for( Path lpath : getSequenceFilePaths(fs, path) ){
+                       for( Path lpath : 
IOUtilFunctions.getSequenceFilePaths(fs, path) ){
                                ReadFileTask t = new ReadFileTask(lpath, job, 
fs, dest, rlen, clen, brlen, bclen);
                                tasks.add(t);
                        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
index 2bdc25e..00a1301 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
@@ -79,7 +79,7 @@ public class ReaderBinaryCell extends MatrixReader
                
                try
                {
-                       for( Path lpath : getSequenceFilePaths(fs,path) ) 
//1..N files 
+                       for( Path lpath : 
IOUtilFunctions.getSequenceFilePaths(fs,path) ) //1..N files 
                        {
                                //directly read from sequence files (individual 
partfiles)
                                SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,job);

http://git-wip-us.apache.org/repos/asf/systemml/blob/717831da/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java 
b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
index 5bdccfd..be6e177 100644
--- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
+++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
@@ -186,7 +186,9 @@ public class MapReduceTool
        public static boolean isFileEmpty(FileSystem fs, Path dir) throws 
IOException {
                FileStatus fstat = fs.getFileStatus(dir);
 
-               if (fstat.isDirectory()) {
+               if( fstat.isDirectory() 
+                       || IOUtilFunctions.isObjectStoreFileScheme(dir) )
+               {
                        // it is a directory
                        FileStatus[] stats = fs.listStatus(dir);
                        if (stats != null) {

Reply via email to