Repository: incubator-systemml
Updated Branches:
  refs/heads/master 88f4a468f -> 557aae0ff


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVReblockMapper.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVReblockMapper.java 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVReblockMapper.java
index 8fcc725..aedd2b0 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVReblockMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVReblockMapper.java
@@ -152,8 +152,9 @@ public class CSVReblockMapper extends MapperBase implements 
Mapper<LongWritable,
                byte matrixIndex=representativeMatrixes.get(0);
                try 
                {
-                       FileSystem fs = FileSystem.get(job);
-                       Path thisPath=new 
Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)).makeQualified(fs);
+                       Path thisPath=new 
Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE));
+                       FileSystem fs = IOUtilFunctions.getFileSystem(thisPath, 
job);
+                       thisPath = thisPath.makeQualified(fs);
                        String filename=thisPath.toString();
                        Path headerPath=new 
Path(job.getStrings(CSVReblockMR.SMALLEST_FILE_NAME_PER_INPUT)[matrixIndex]).makeQualified(fs);
                        if(headerPath.toString().equals(filename))

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
index a7d881d..50a7412 100644
--- 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java
@@ -70,6 +70,7 @@ import 
org.apache.sysml.runtime.instructions.mr.ReblockInstruction;
 import org.apache.sysml.runtime.instructions.mr.RemoveEmptyMRInstruction;
 import org.apache.sysml.runtime.instructions.mr.UnaryMRInstructionBase;
 import org.apache.sysml.runtime.io.BinaryBlockSerialization;
+import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.AddDummyWeightConverter;
 import org.apache.sysml.runtime.matrix.data.BinaryBlockToBinaryCellConverter;
@@ -748,10 +749,9 @@ public class MRJobConfiguration
                for(int i=0; i<matrices.length; i++)
                        matrices[i]=new Path(matrices[i]).toString();
                
-               FileSystem fs=FileSystem.get(job);
-               Path thisFile=new 
Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)).makeQualified(fs);
-               
-               //Path p=new Path(thisFileName);
+               Path thisFile=new 
Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE));
+               FileSystem fs = IOUtilFunctions.getFileSystem(thisFile, job);
+               thisFile = thisFile.makeQualified(fs);
                
                Path thisDir=thisFile.getParent().makeQualified(fs);
                ArrayList<Byte> representativeMatrixes=new ArrayList<Byte>();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/matrix/sort/CompactInputFormat.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/sort/CompactInputFormat.java 
b/src/main/java/org/apache/sysml/runtime/matrix/sort/CompactInputFormat.java
index a0faa15..350f65b 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/sort/CompactInputFormat.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/CompactInputFormat.java
@@ -77,9 +77,9 @@ public class CompactInputFormat<K extends WritableComparable, 
V extends Writable
                @SuppressWarnings("unchecked")
                public CompactInputRecordReader(JobConf job, FileSplit split) 
throws IOException {
 
-               fs = FileSystem.get(job);
                path = split.getPath();
-               totLength = split.getLength();
+               fs = IOUtilFunctions.getFileSystem(path, job);
+                       totLength = split.getLength();
                currentStream = fs.open(path);
                keyClass=(Class<? extends WritableComparable>) 
job.getClass(KEY_CLASS, WritableComparable.class);
                valueClass=(Class<? extends Writable>) 
job.getClass(VALUE_CLASS, Writable.class);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
 
b/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
index 3559593..04899f3 100644
--- 
a/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
@@ -242,7 +242,7 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
                        // check if the current part file needs to be processed
                path = split.getPath();
                totLength = split.getLength();
-               currentStream = FileSystem.get(job).open(path);
+               currentStream = IOUtilFunctions.getFileSystem(path, 
job).open(path);
                currPart = getIndexInTheArray(path.getName());
                
                if ( currPart < beginPart || currPart > endPart ) {
@@ -394,9 +394,9 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
                public PickRecordReader(JobConf job, FileSplit split)
                        throws IOException
                {
-                       fs = FileSystem.get(job);
-               path = split.getPath();
-               currentStream = fs.open(path);
+                       path = split.getPath();
+                       fs = IOUtilFunctions.getFileSystem(path, job);
+                       currentStream = fs.open(path);
                int partIndex=getIndexInTheArray(path.getName());
                String arrStr=job.get(SELECTED_POINTS_PREFIX+partIndex);
                if(arrStr==null || arrStr.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java 
b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
index 6d9cb1f..77c06ae 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java
@@ -72,9 +72,9 @@ public class ApplyTfBBMapper extends MapperBase implements 
Mapper<LongWritable,
                        OffsetCount value=new OffsetCount();
                        Path p=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME));
                        
-                       FileSystem fs = FileSystem.get(job);
-                       Path thisPath=new 
Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)).makeQualified(fs);
-                       String thisfile=thisPath.toString();
+                       Path path=new 
Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE));
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
+                       String thisfile=path.makeQualified(fs).toString();
 
                        SequenceFile.Reader reader = null;
                        try {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java 
b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
index 3ee0e76..e2885d8 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.CSVReblockMR;
 import org.apache.sysml.runtime.matrix.JobReturn;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -80,7 +81,7 @@ public class ApplyTfCSVMR {
                FileInputFormat.addInputPath(job, new Path(inputPath));
                // delete outputPath, if exists already.
                Path outPath = new Path(outputPath);
-               FileSystem fs = FileSystem.get(job);
+               FileSystem fs = IOUtilFunctions.getFileSystem(outPath, job);
                fs.delete(outPath, true);
                FileOutputFormat.setOutputPath(job, outPath);
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java 
b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
index db3be0f..05b8a19 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java
@@ -75,7 +75,8 @@ public class ApplyTfCSVMapper implements Mapper<LongWritable, 
Text, NullWritable
                        
                        // setup the writer for mapper's output
                        // the default part-..... files will be deleted later 
once the job finishes 
-                       br = new BufferedWriter(new 
OutputStreamWriter(FileSystem.get(_rJob).create( mapOutputPath, true)));
+                       FileSystem fs = 
IOUtilFunctions.getFileSystem(mapOutputPath);
+                       br = new BufferedWriter(new 
OutputStreamWriter(fs.create( mapOutputPath, true)));
                }
                
                // output the header line

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java 
b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
index 722c6a9..4a7839d 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java
@@ -724,12 +724,12 @@ public class DataTransform
                TransformOperands oprnds = new TransformOperands(insts[0], 
inputs[0]);
                
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               FileSystem fs = FileSystem.get(job);
                
                // find the first file in alphabetical ordering of part files 
in directory inputPath 
                String smallestFile = CSVReblockMR.findSmallestFile(job, 
oprnds.inputPath);
                
                // find column names
+               FileSystem fs = IOUtilFunctions.getFileSystem(smallestFile);
                String headerLine = readHeaderLine(fs, 
oprnds.inputCSVProperties, smallestFile);
                HashMap<String, Integer> colNamesToIds = processColumnNames(fs, 
oprnds.inputCSVProperties, headerLine, smallestFile);
                String outHeader = getOutputHeader(fs, headerLine, oprnds);
@@ -990,9 +990,10 @@ public class DataTransform
                throws IOException, DMLRuntimeException, 
IllegalArgumentException, JSONException 
        {
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               FileSystem fs = FileSystem.get(job);
+               
                // find the first file in alphabetical ordering of partfiles in 
directory inputPath 
                String smallestFile = CSVReblockMR.findSmallestFile(job, 
oprnds.inputPath);
+               FileSystem fs = IOUtilFunctions.getFileSystem(smallestFile);
                
                // find column names
                String headerLine = readHeaderLine(fs, 
oprnds.inputCSVProperties, smallestFile);
@@ -1364,7 +1365,7 @@ public class DataTransform
                TransformOperands oprnds = new 
TransformOperands(inst.getParams(), inputs[0]);
                
                JobConf job = new JobConf();
-               FileSystem fs = FileSystem.get(job);
+               FileSystem fs = 
IOUtilFunctions.getFileSystem(inputs[0].getFileName());
                
                checkIfOutputOverlapsWithTxMtd(oprnds.txMtdPath, 
outputs[0].getFileName(), fs);
                

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java 
b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java
index da298ae..68024d9 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
@@ -70,7 +71,7 @@ public class GenTfMtdMR {
                FileInputFormat.addInputPath(job, new Path(inputPath));
                // delete outputPath, if exists already.
                Path outPath = new Path(txMtdPath);
-               FileSystem fs = FileSystem.get(job);
+               FileSystem fs = IOUtilFunctions.getFileSystem(outPath, job);
                fs.delete(outPath, true);
                FileOutputFormat.setOutputPath(job, outPath);
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java 
b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
index 58868e0..9e30f5c 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
@@ -186,9 +186,9 @@ public class TfUtils implements Serializable{
                                return false;
 
                // check for empty file
-               if (MapReduceTool.isFileEmpty(fs, path.toString()))
+               if( MapReduceTool.isFileEmpty(fs, path) )
                        if ( err )
-                       throw new EOFException("Empty input file " + 
path.toString() + ".");
+                               throw new EOFException("Empty input file " + 
path.toString() + ".");
                        else
                                return false;
                
@@ -196,21 +196,18 @@ public class TfUtils implements Serializable{
        }
        
        public static String getPartFileName(JobConf job) throws IOException {
-               FileSystem fs = FileSystem.get(job);
-               Path thisPath=new 
Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)).makeQualified(fs);
-               return thisPath.toString();
+               Path path = new 
Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE));
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
+               path = path.makeQualified(fs);
+               return path.toString();
        }
        
        public static boolean isPartFileWithHeader(JobConf job) throws 
IOException {
-               FileSystem fs = FileSystem.get(job);
-               
                String thisfile=getPartFileName(job);
-               Path smallestFilePath=new 
Path(job.get(MRJobConfiguration.TF_SMALLEST_FILE)).makeQualified(fs);
-               
-               if(thisfile.toString().equals(smallestFilePath.toString()))
-                       return true;
-               else
-                       return false;
+               Path path = new 
Path(job.get(MRJobConfiguration.TF_SMALLEST_FILE));
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
+               path = path.makeQualified(fs);
+               return thisfile.toString().equals(path.toString());
        }
        
        /**
@@ -372,8 +369,8 @@ public class TfUtils implements Serializable{
                        fs = FileSystem.getLocal(job);
                }
                else {
-                       fs = FileSystem.get(job);
                        tfMtdDir = new Path(getTfMtdDir());
+                       fs = IOUtilFunctions.getFileSystem(tfMtdDir, job);
                }
                
                // load transformation metadata 
@@ -501,7 +498,7 @@ public class TfUtils implements Serializable{
        private Reader initOffsetsReader(JobConf job) throws IOException 
        {
                Path path=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME));
-               FileSystem fs = FileSystem.get(job);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                Path[] files = MatrixReader.getSequenceFilePaths(fs, path);
                if ( files.length != 1 )
                        throw new IOException("Expecting a single file under 
counters file: " + path.toString());

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java 
b/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
index 82f1463..0d74088 100644
--- a/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
@@ -43,7 +43,6 @@ import org.apache.sysml.conf.DMLConfig;
 import org.apache.sysml.lops.Lop;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
-import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
@@ -441,36 +440,6 @@ public class LocalFileUtils
        }
        
        /**
-        * Validate external directory and filenames as soon as they enter the 
system
-        * in order to prevent security issues such as path traversal, etc.
-        * Currently, external (user provided) filenames are: scriptfile, 
config file,
-        * local tmp working dir, hdfs working dir (scratch), read/write 
expressions,
-        * and several export functionalities.   
-        *  
-        * 
-        * @param fname file name
-        * @param hdfs true if check for HDFS
-        * @return true if valid filename
-        */
-       public static boolean validateExternalFilename( String fname, boolean 
hdfs )
-       {
-               boolean ret = true;
-               
-               //check read local file from hdfs context
-               //(note: currently rejected with "wrong fs" anyway but this is 
impl-specific)
-               if( hdfs && !InfrastructureAnalyzer.isLocalMode() 
-                       && fname.startsWith("file:") )
-               {
-                       //prevent redirection to local file system
-                       ret = false; 
-               }
-               
-               //TODO white and black lists according to BI requirements
-               
-               return ret;
-       }
-       
-       /**
         * Writes a simple text file to local file system.
         * 
         * @param file output file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java 
b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
index cd32a0c..e81dbc1 100644
--- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
+++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
@@ -68,12 +68,6 @@ public class MapReduceTool
        private static final int MAX_DELETE_RETRIES = 10;
        
        private static final Log LOG = 
LogFactory.getLog(MapReduceTool.class.getName());
-       private static JobConf _rJob = null; //cached job conf for read-only 
operations
-       
-       static{
-               _rJob = ConfigurationManager.getCachedJobConf();
-       }
-       
        
        public static String getUniqueKeyPerTask(JobConf job, boolean inMapper) 
{
                //TODO: investigate ID pattern, required for parallel jobs
@@ -109,10 +103,15 @@ public class MapReduceTool
                return job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID);
        }
 
-       public static boolean existsFileOnHDFS(String fname){
+       public static boolean existsFileOnHDFS(String fname) {
+               //robustness for empty strings (e.g., JMLC, MLContext)
+               if( fname == null || fname.isEmpty() )
+                       return false;
+               
                try {
-                       return FileSystem.get(_rJob)
-                               .exists(new Path(fname));
+                       Path path = new Path(fname);
+                       return IOUtilFunctions
+                               .getFileSystem(path).exists(path);
                }
                catch(Exception ex) {
                        LOG.error("Failed check existsFileOnHDFS.", ex);
@@ -121,9 +120,14 @@ public class MapReduceTool
        }
        
        public static boolean isDirectory(String fname) {
+               //robustness for empty strings (e.g., JMLC, MLContext)
+               if( fname == null || fname.isEmpty() )
+                       return false;
+               
                try {
-                       return FileSystem.get(_rJob)
-                               .isDirectory(new Path(fname));
+                       Path path = new Path(fname);
+                       return IOUtilFunctions
+                               .getFileSystem(path).isDirectory(path);
                }
                catch(Exception ex) {
                        LOG.error("Failed check isDirectory.", ex);
@@ -133,8 +137,9 @@ public class MapReduceTool
        
        public static FileStatus[] getDirectoryListing(String fname) {
                try {
-                       return FileSystem.get(_rJob)
-                               .listStatus(new Path(fname));
+                       Path path = new Path(fname);
+                       return IOUtilFunctions
+                               .getFileSystem(path).listStatus(path);
                }
                catch(Exception ex) {
                        LOG.error("Failed listing of directory contents.", ex);
@@ -148,11 +153,12 @@ public class MapReduceTool
        }
        
        public static void deleteFileIfExistOnHDFS(String dir) throws 
IOException {
-               deleteFileIfExists(FileSystem.get(_rJob), new Path(dir));
+               Path path = new Path(dir);
+               deleteFileIfExists(IOUtilFunctions.getFileSystem(path), path);
        }
 
        public static void deleteFileIfExistOnHDFS(Path outpath, JobConf job) 
throws IOException {
-               deleteFileIfExists(FileSystem.get(job), outpath);
+               deleteFileIfExists(IOUtilFunctions.getFileSystem(outpath, job), 
outpath);
        }
        
        public static void deleteFileIfExistOnLFS(Path outpath, JobConf job) 
throws IOException {
@@ -169,17 +175,20 @@ public class MapReduceTool
        }
 
        public static boolean isHDFSFileEmpty(String dir) throws IOException {
-               FileSystem fs = FileSystem.get(_rJob);
-               return isFileEmpty(fs, dir);
+               //robustness for empty strings (e.g., JMLC, MLContext)
+               if( dir == null || dir.isEmpty() )
+                       return false;
+               Path path = new Path(dir);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path);
+               return isFileEmpty(fs, path);
        }
 
-       public static boolean isFileEmpty(FileSystem fs, String dir) throws 
IOException {
-               Path pth = new Path(dir);
-               FileStatus fstat = fs.getFileStatus(pth);
+       public static boolean isFileEmpty(FileSystem fs, Path dir) throws 
IOException {
+               FileStatus fstat = fs.getFileStatus(dir);
 
                if (fstat.isDirectory()) {
                        // it is a directory
-                       FileStatus[] stats = fs.listStatus(pth);
+                       FileStatus[] stats = fs.listStatus(dir);
                        if (stats != null) {
                                for (FileStatus stat : stats) {
                                        if (stat.getLen() > 0)
@@ -189,33 +198,35 @@ public class MapReduceTool
                        } else {
                                return true;
                        }
-               } else {
+               } 
+               else {
                        // it is a regular file
-                       if (fstat.getLen() == 0)
-                               return true;
-                       else
-                               return false;
+                       return (fstat.getLen() == 0);
                }
        }
 
        public static void renameFileOnHDFS(String originalDir, String newDir) 
throws IOException {
-               Path originalpath = new Path(originalDir);
+               Path pathOrig = new Path(originalDir);
+               Path pathNew = new Path(newDir);
+               if( !IOUtilFunctions.isSameFileScheme(pathOrig, pathNew) )
+                       throw new IOException("Cannot rename files to different 
target file system.");
                
                deleteFileIfExistOnHDFS(newDir);
-               Path newpath = new Path(newDir);
-               
-               FileSystem fs = FileSystem.get(_rJob);
-               if (fs.exists(originalpath)) {
-                       fs.rename(originalpath, newpath);
-               }
-               else {
+               FileSystem fs = IOUtilFunctions.getFileSystem(pathOrig);
+               if( fs.exists(pathOrig) )
+                       fs.rename(pathOrig, pathNew);
+               else
                        throw new FileNotFoundException(originalDir);
-               }
        }
        
        public static void mergeIntoSingleFile(String originalDir, String 
newFile) throws IOException {
-               FileSystem fs = FileSystem.get(_rJob);
-               FileUtil.copyMerge(fs, new Path(originalDir), fs, new 
Path(newFile), true, _rJob, null);
+               Path pathOrig = new Path(originalDir);
+               Path pathNew = new Path(newFile);
+               if( !IOUtilFunctions.isSameFileScheme(pathOrig, pathNew) )
+                       throw new IOException("Cannot merge files into 
different target file system.");
+               FileSystem fs = IOUtilFunctions.getFileSystem(pathOrig);
+               FileUtil.copyMerge(fs, pathOrig, fs, pathNew, true, 
+                       ConfigurationManager.getCachedJobConf(), null);
        }
 
        public static void copyFileOnHDFS(String originalDir, String newDir) 
throws IOException {
@@ -225,7 +236,7 @@ public class MapReduceTool
                boolean overwrite = true;
                
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               FileSystem fs = FileSystem.get(job);
+               FileSystem fs = IOUtilFunctions.getFileSystem(originalPath, 
job);
                if (fs.exists(originalPath)) {
                        FileUtil.copy(fs, originalPath, fs, newPath, 
deleteSource, overwrite, job);
                }
@@ -241,7 +252,7 @@ public class MapReduceTool
        public static long getFilesizeOnHDFS( Path path ) 
                throws IOException
        {
-               FileSystem fs = FileSystem.get(_rJob);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path);
                long ret = 0; //in bytes
                if( fs.isDirectory(path) )
                        ret = fs.getContentSummary(path).getLength();
@@ -253,9 +264,9 @@ public class MapReduceTool
        }
        
        private static BufferedReader setupInputFile ( String filename ) throws 
IOException {
-        Path pt=new Path(filename);
-        FileSystem fs = FileSystem.get(_rJob);
-        BufferedReader br=new BufferedReader(new 
InputStreamReader(fs.open(pt)));              
+        Path path = new Path(filename);
+        FileSystem fs = IOUtilFunctions.getFileSystem(path);
+               BufferedReader br=new BufferedReader(new 
InputStreamReader(fs.open(path)));             
         return br;
        }
        
@@ -305,9 +316,9 @@ public class MapReduceTool
        }
                
        private static BufferedWriter setupOutputFile ( String filename ) 
throws IOException {
-        Path pt=new Path(filename);
-        FileSystem fs = FileSystem.get(_rJob);
-        BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true)));              
+        Path path = new Path(filename);
+        FileSystem fs = IOUtilFunctions.getFileSystem(path);
+               BufferedWriter br=new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));             
         return br;
        }
        
@@ -351,17 +362,17 @@ public class MapReduceTool
        public static MatrixCharacteristics[] processDimsFiles(String dir, 
MatrixCharacteristics[] stats) 
                throws IOException 
        {
-               Path pt=new Path(dir);
-        FileSystem fs = FileSystem.get(_rJob);
+               Path path = new Path(dir);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path);
                
-        if ( !fs.exists(pt) )
+        if ( !fs.exists(path) )
                return stats;
         
-        FileStatus fstat = fs.getFileStatus(pt);
+        FileStatus fstat = fs.getFileStatus(path);
                
         if ( fstat.isDirectory() ) 
         {
-                       FileStatus[] files = fs.listStatus(pt);
+                       FileStatus[] files = fs.listStatus(path);
                        for ( int i=0; i < files.length; i++ ) {
                                Path filePath = files[i].getPath();
                                try( BufferedReader br = 
setupInputFile(filePath.toString()) ) {
@@ -405,9 +416,9 @@ public class MapReduceTool
                        OutputInfo outinfo, FileFormatProperties 
formatProperties) 
                throws IOException 
        {
-               Path pt = new Path(mtdfile);
-               FileSystem fs = FileSystem.get(_rJob);
-               try( BufferedWriter br = new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true))) ) {
+               Path path = new Path(mtdfile);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path);
+               try( BufferedWriter br = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true))) ) {
                        String mtd = metaDataToString(vt, schema, dt, mc, 
outinfo, formatProperties);
                        br.write(mtd);
                } catch (Exception e) {
@@ -418,9 +429,9 @@ public class MapReduceTool
        public static void writeScalarMetaDataFile(String mtdfile, ValueType 
vt) 
                throws IOException 
        {
-               Path pt = new Path(mtdfile);
-               FileSystem fs = FileSystem.get(_rJob);
-               try( BufferedWriter br = new BufferedWriter(new 
OutputStreamWriter(fs.create(pt,true))) ) {
+               Path path = new Path(mtdfile);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path);
+               try( BufferedWriter br = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true))) ) {
                        String mtd = metaDataToString(vt, null, 
DataType.SCALAR, null, OutputInfo.TextCellOutputInfo, null);
                        br.write(mtd);
                } 
@@ -555,8 +566,8 @@ public class MapReduceTool
                else
                        offset=(int)pos-1;
                
-               FileSystem fs=FileSystem.get(_rJob);
                Path path=new Path(dir);
+               FileSystem fs=IOUtilFunctions.getFileSystem(path);
                FileStatus[] files=fs.listStatus(path);
                Path fileToRead=null;
                for(FileStatus file: files)
@@ -610,11 +621,16 @@ public class MapReduceTool
        }
 
        public static void createDirIfNotExistOnHDFS(String dir, String 
permissions) 
+               throws IOException 
+       {
+               createDirIfNotExistOnHDFS(new Path(dir), permissions);
+       }
+       
+       public static void createDirIfNotExistOnHDFS(Path path, String 
permissions) 
                throws IOException
        {
-               Path path = new Path(dir);
                try {
-                       FileSystem fs = FileSystem.get(_rJob);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path);
                        if( !fs.exists(path) ) 
                        {
                                char[] c = permissions.toCharArray();
@@ -637,8 +653,8 @@ public class MapReduceTool
        public static FSDataOutputStream getHDFSDataOutputStream(String 
filename, boolean overwrite) 
                throws IOException
        {
-               FileSystem fs = FileSystem.get(_rJob);
                Path path = new Path(filename);
-               return fs.create(path, overwrite);
+               return IOUtilFunctions.getFileSystem(path)
+                       .create(path, overwrite);
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/udf/lib/RemoveEmptyRows.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/udf/lib/RemoveEmptyRows.java 
b/src/main/java/org/apache/sysml/udf/lib/RemoveEmptyRows.java
index b45d225..195a9e0 100644
--- a/src/main/java/org/apache/sysml/udf/lib/RemoveEmptyRows.java
+++ b/src/main/java/org/apache/sysml/udf/lib/RemoveEmptyRows.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TextInputFormat;
 
 import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.util.MapReduceTool;
 import org.apache.sysml.udf.FunctionParameter;
 import org.apache.sysml.udf.Matrix;
@@ -77,7 +78,7 @@ public class RemoveEmptyRows extends PackageFunction
                        //prepare input
                        JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
                        Path path = new Path(fnameOld);
-                       FileSystem fs = FileSystem.get(job);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
                        if( !fs.exists(path) )  
                                throw new IOException("File "+fnameOld+" does 
not exist on HDFS.");
                        FileInputFormat.addInputPath(job, path); 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/yarn/DMLAppMaster.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/yarn/DMLAppMaster.java 
b/src/main/java/org/apache/sysml/yarn/DMLAppMaster.java
index 8954951..ba2a9f1 100644
--- a/src/main/java/org/apache/sysml/yarn/DMLAppMaster.java
+++ b/src/main/java/org/apache/sysml/yarn/DMLAppMaster.java
@@ -43,6 +43,7 @@ import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.conf.DMLConfig;
 import org.apache.sysml.runtime.DMLScriptException;
+import org.apache.sysml.runtime.io.IOUtilFunctions;
 
 public class DMLAppMaster 
 {
@@ -136,7 +137,7 @@ public class DMLAppMaster
                
                //write given message to hdfs
                try {
-                       FileSystem fs = FileSystem.get(_conf);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(msgPath, 
_conf);
                        try( FSDataOutputStream fout = fs.create(msgPath, true) 
) {
                                fout.writeBytes( msg );
                        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/yarn/DMLYarnClient.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/yarn/DMLYarnClient.java 
b/src/main/java/org/apache/sysml/yarn/DMLYarnClient.java
index c4da603..eb7a492 100644
--- a/src/main/java/org/apache/sysml/yarn/DMLYarnClient.java
+++ b/src/main/java/org/apache/sysml/yarn/DMLYarnClient.java
@@ -57,6 +57,7 @@ import org.apache.sysml.parser.ParseException;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.DMLScriptException;
 import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
+import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.util.MapReduceTool;
 
 /**
@@ -264,17 +265,17 @@ public class DMLYarnClient
        private void copyResourcesToHdfsWorkingDir( YarnConfiguration yconf, 
String hdfsWD ) 
                throws ParseException, IOException, DMLRuntimeException, 
InterruptedException 
        {
-               FileSystem fs = FileSystem.get(yconf);
+               Path confPath = new Path(hdfsWD, DML_CONFIG_NAME);
+               FileSystem fs = IOUtilFunctions.getFileSystem(confPath, yconf);
                
                //create working directory
-               MapReduceTool.createDirIfNotExistOnHDFS(hdfsWD, 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
+               MapReduceTool.createDirIfNotExistOnHDFS(confPath, 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
                
                //serialize the dml config to HDFS file 
                //NOTE: we do not modify and ship the absolute scratch space 
path of the current user
                //because this might result in permission issues if the app 
master is run with a different user
                //(runtime plan migration during resource reoptimizations now 
needs to use qualified names
                //for shipping/reading intermediates) TODO modify resource 
reoptimizer on prototype integration.
-               Path confPath = new Path(hdfsWD, DML_CONFIG_NAME);
                try( FSDataOutputStream fout = fs.create(confPath, true) ) {
                        fout.writeBytes(_dmlConfig.serializeDMLConfig() + "\n");
                }
@@ -451,7 +452,8 @@ public class DMLYarnClient
                Path path = new Path(_hdfsJarFile); 
                
                LocalResource resource = Records.newRecord(LocalResource.class);
-               FileStatus jarStat = FileSystem.get(yconf).getFileStatus(path);
+               FileStatus jarStat = IOUtilFunctions
+                       .getFileSystem(path, yconf).getFileStatus(path);
                resource.setResource(ConverterUtils.getYarnUrlFromPath(path));
                resource.setSize(jarStat.getLen());
                resource.setTimestamp(jarStat.getModificationTime());
@@ -518,9 +520,8 @@ public class DMLYarnClient
                
                //write given message to hdfs
                try {
-                       FileSystem fs = FileSystem.get(yconf);
-                       if( fs.exists(msgPath) )
-                       {
+                       FileSystem fs = IOUtilFunctions.getFileSystem(msgPath, 
yconf);
+                       if( fs.exists(msgPath) ) {
                                try( BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(msgPath))) ) {
                                        ret = br.readLine();
                                }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/test/java/org/apache/sysml/test/integration/functions/transform/ScalingTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/transform/ScalingTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/transform/ScalingTest.java
index b66813d..a80e8b3 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/transform/ScalingTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/transform/ScalingTest.java
@@ -34,6 +34,7 @@ import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.io.ReaderBinaryBlock;
 import org.apache.sysml.runtime.io.ReaderTextCSV;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
@@ -142,7 +143,7 @@ public class ScalingTest extends AutomatedTestBase
                }
                outputSpec.put(TfUtils.TXMETHOD_SCALE, scaleSpec);
                
-               FileSystem fs = FileSystem.get(TestUtils.conf);
+               FileSystem fs = IOUtilFunctions.getFileSystem(specFile);
                try( BufferedWriter out = new BufferedWriter(new 
OutputStreamWriter(fs.create(new Path(specFile),true))) ) {
                        out.write(outputSpec.toString());
                }
@@ -157,7 +158,7 @@ public class ScalingTest extends AutomatedTestBase
                mtd.put("format", "csv");
                mtd.put("header", false);
                
-               FileSystem fs = FileSystem.get(TestUtils.conf);
+               FileSystem fs = IOUtilFunctions.getFileSystem(datafile);
                try( BufferedWriter out = new BufferedWriter(new 
OutputStreamWriter(fs.create(new Path(datafile+".mtd"),true))) ) {
                        out.write(mtd.toString());
                }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/test/java/org/apache/sysml/test/utils/TestUtils.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/utils/TestUtils.java 
b/src/test/java/org/apache/sysml/test/utils/TestUtils.java
index 19b741d..beb19a3 100644
--- a/src/test/java/org/apache/sysml/test/utils/TestUtils.java
+++ b/src/test/java/org/apache/sysml/test/utils/TestUtils.java
@@ -98,9 +98,9 @@ public class TestUtils
                try {
                        String lineExpected = null;
                        String lineActual = null;
-                       FileSystem fs = FileSystem.get(conf);
                        
                        Path compareFile = new Path(expectedFile);
+                       FileSystem fs = 
IOUtilFunctions.getFileSystem(compareFile, conf);
                        FSDataInputStream fsin = fs.open(compareFile);
                        try( BufferedReader compareIn = new BufferedReader(new 
InputStreamReader(fsin)) ) {
                                lineExpected = compareIn.readLine();
@@ -130,9 +130,9 @@ public class TestUtils
                try {
                        HashMap<CellIndex, Double> expectedValues = new 
HashMap<CellIndex, Double>();
                        
-                       FileSystem fs = FileSystem.get(conf);
                        Path outDirectory = new Path(actualDir);
                        Path compareFile = new Path(expectedFile);
+                       FileSystem fs = 
IOUtilFunctions.getFileSystem(outDirectory, conf);
                        FSDataInputStream fsin = fs.open(compareFile);
                        try( BufferedReader compareIn = new BufferedReader(new 
InputStreamReader(fsin)) ) {
                                String line;
@@ -207,9 +207,9 @@ public class TestUtils
         */
        public static void compareMMMatrixWithJavaMatrix(String expectedFile, 
String actualDir, double epsilon) {
                try {
-                       FileSystem fs = FileSystem.get(conf);
                        Path outDirectory = new Path(actualDir);
                        Path compareFile = new Path(expectedFile);
+                       FileSystem fs = 
IOUtilFunctions.getFileSystem(outDirectory, conf);
                        FSDataInputStream fsin = fs.open(compareFile);
                        
                        HashMap<CellIndex, Double> expectedValues = new 
HashMap<CellIndex, Double>();
@@ -300,9 +300,9 @@ public class TestUtils
         */
        public static void compareDMLMatrixWithJavaMatrix(String expectedFile, 
String actualDir, double epsilon) {
                try {
-                       FileSystem fs = FileSystem.get(conf);
                        Path outDirectory = new Path(actualDir);
                        Path compareFile = new Path(expectedFile);
+                       FileSystem fs = 
IOUtilFunctions.getFileSystem(outDirectory, conf);
                        FSDataInputStream fsin = fs.open(compareFile);
                        
                        HashMap<CellIndex, Double> expectedValues = new 
HashMap<CellIndex, Double>();
@@ -370,8 +370,8 @@ public class TestUtils
                
                try 
                {
-                       FileSystem fs = FileSystem.get(conf);
                        Path outDirectory = new Path(filePath);
+                       FileSystem fs = 
IOUtilFunctions.getFileSystem(outDirectory, conf);
                        String line;
 
                        FileStatus[] outFiles = fs.listStatus(outDirectory);
@@ -462,11 +462,10 @@ public class TestUtils
        }
 
        public static double readDMLScalar(String filePath) {
-               FileSystem fs;
                try {
                        double d=Double.NaN;
-                       fs = FileSystem.get(conf);
                        Path outDirectory = new Path(filePath);
+                       FileSystem fs = 
IOUtilFunctions.getFileSystem(outDirectory, conf);
                        String line;
                        FileStatus[] outFiles = fs.listStatus(outDirectory);
                        for (FileStatus file : outFiles) {
@@ -485,11 +484,10 @@ public class TestUtils
        }
 
        public static boolean readDMLBoolean(String filePath) {
-               FileSystem fs;
                try {
                        Boolean b = null;
-                       fs = FileSystem.get(conf);
                        Path outDirectory = new Path(filePath);
+                       FileSystem fs = 
IOUtilFunctions.getFileSystem(outDirectory, conf);
                        String line;
                        FileStatus[] outFiles = fs.listStatus(outDirectory);
                        for (FileStatus file : outFiles) {
@@ -508,11 +506,10 @@ public class TestUtils
        }
        
        public static String readDMLString(String filePath) {
-               FileSystem fs;
                try {
                        StringBuilder sb =  new StringBuilder();
-                       fs = FileSystem.get(conf);
                        Path outDirectory = new Path(filePath);
+                       FileSystem fs = 
IOUtilFunctions.getFileSystem(outDirectory, conf);
                        FileStatus[] outFiles = fs.listStatus(outDirectory);
                        for (FileStatus file : outFiles) {
                                FSDataInputStream fsout = 
fs.open(file.getPath());
@@ -989,8 +986,8 @@ public class TestUtils
         */
        public static void compareDMLHDFSFileWithRFile(String rFile, String 
hdfsDir, double epsilon) {
                try {
-                       FileSystem fs = FileSystem.get(conf);
                        Path outDirectory = new Path(hdfsDir);
+                       FileSystem fs = 
IOUtilFunctions.getFileSystem(outDirectory, conf);
                        HashMap<CellIndex, Double> expectedValues = new 
HashMap<CellIndex, Double>();
                        HashMap<CellIndex, Double> actualValues = new 
HashMap<CellIndex, Double>();
                        try(BufferedReader compareIn = new BufferedReader(new 
FileReader(rFile))) {
@@ -1089,8 +1086,8 @@ public class TestUtils
         */
        public static void checkMatrix(String outDir, long rows, long cols, 
double min, double max) {
                try {
-                       FileSystem fs = FileSystem.get(conf);
                        Path outDirectory = new Path(outDir);
+                       FileSystem fs = 
IOUtilFunctions.getFileSystem(outDirectory, conf);
                        assertTrue(outDir + " does not exist", 
fs.exists(outDirectory));
                        
                        if( fs.getFileStatus(outDirectory).isDirectory() )
@@ -1143,8 +1140,8 @@ public class TestUtils
         */
        public static void checkForOutputExistence(String outDir) {
                try {
-                       FileSystem fs = FileSystem.get(conf);
                        Path outDirectory = new Path(outDir);
+                       FileSystem fs = 
IOUtilFunctions.getFileSystem(outDirectory, conf);
                        FileStatus[] outFiles = fs.listStatus(outDirectory);
                        assertEquals("number of files in directory not 1", 1, 
outFiles.length);
                        FSDataInputStream fsout = 
fs.open(outFiles[0].getPath());
@@ -1169,9 +1166,9 @@ public class TestUtils
         */
        public static void removeHDFSDirectories(String[] directories) {
                try {
-                       FileSystem fs = FileSystem.get(conf);
                        for (String directory : directories) {
                                Path dir = new Path(directory);
+                               FileSystem fs = 
IOUtilFunctions.getFileSystem(dir, conf);
                                if (fs.exists(dir) && 
fs.getFileStatus(dir).isDirectory()) {
                                        fs.delete(dir, true);
                                }
@@ -1219,9 +1216,9 @@ public class TestUtils
         */
        public static void removeHDFSFiles(String[] files) {
                try {
-                       FileSystem fs = FileSystem.get(conf);
                        for (String directory : files) {
                                Path dir = new Path(directory);
+                               FileSystem fs = 
IOUtilFunctions.getFileSystem(dir, conf);
                                if (fs.exists(dir) && 
!fs.getFileStatus(dir).isDirectory()) {
                                        fs.delete(dir, false);
                                }
@@ -1258,8 +1255,9 @@ public class TestUtils
         */
        public static void clearDirectory(String directory) {
                try {
-                       FileSystem fs = FileSystem.get(conf);
-                       FileStatus[] directoryContent = fs.listStatus(new 
Path(directory));
+                       Path path = new Path(directory);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
conf);
+                       FileStatus[] directoryContent = fs.listStatus(path);
                        for (FileStatus content : directoryContent) {
                                fs.delete(content.getPath(), true);
                        }
@@ -1368,8 +1366,8 @@ public class TestUtils
        public static void generateTestMatrixToFile(String file, int rows, int 
cols, double min, double max,
                        double sparsity, long seed) {
                try {
-                       FileSystem fs = FileSystem.get(conf);
                        Path inFile = new Path(file);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(inFile, 
conf);
                        DataOutputStream out = fs.create(inFile);
                        try( PrintWriter pw = new PrintWriter(out) ) {
                                Random random = (seed == -1) ? TestUtils.random 
: new Random(seed);
@@ -1411,8 +1409,9 @@ public class TestUtils
                try 
                {
                        //create outputstream to HDFS / FS and writer
-                       FileSystem fs = FileSystem.get(conf);
-                       DataOutputStream out = fs.create(new Path(file), true);
+                       Path path = new Path(file);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
conf);
+                       DataOutputStream out = fs.create(path, true);
                        try( BufferedWriter pw = new BufferedWriter(new 
OutputStreamWriter(out))) {
                                //writer actual matrix
                                StringBuilder sb = new StringBuilder();
@@ -1457,8 +1456,9 @@ public class TestUtils
                        //create outputstream to HDFS / FS and writer
                        DataOutputStream out = null;
                        if (!isR) {
-                               FileSystem fs = FileSystem.get(conf);
-                               out = fs.create(new Path(file), true);
+                               Path path = new Path(file);
+                               FileSystem fs = 
IOUtilFunctions.getFileSystem(path, conf);
+                               out = fs.create(path, true);
                        } 
                        else {
                                out = new DataOutputStream(new 
FileOutputStream(file));
@@ -1605,7 +1605,9 @@ public class TestUtils
                try {
                        SequenceFile.Writer writer = null;
                        try {
-                               writer = new 
SequenceFile.Writer(FileSystem.get(conf), conf, new Path(file),
+                               Path path = new Path(file);
+                               FileSystem fs = 
IOUtilFunctions.getFileSystem(path, conf);
+                               writer = new SequenceFile.Writer(fs, conf, path,
                                        MatrixIndexes.class, MatrixCell.class);
 
                                MatrixIndexes index = new MatrixIndexes();
@@ -1651,7 +1653,9 @@ public class TestUtils
                SequenceFile.Writer writer = null;
                        
                try {
-                       writer = new SequenceFile.Writer(FileSystem.get(conf), 
conf, new Path(file),
+                       Path path = new Path(file);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
conf);
+                       writer = new SequenceFile.Writer(fs, conf, path,
                                        MatrixIndexes.class, MatrixBlock.class);
 
                        MatrixIndexes index = new MatrixIndexes();
@@ -1774,8 +1778,8 @@ public class TestUtils
         */
        public static void removeTemporaryFiles() {
                try {
-                       FileSystem fs = FileSystem.get(conf);
                        Path workingDir = new Path(".");
+                       FileSystem fs = 
IOUtilFunctions.getFileSystem(workingDir, conf);
                        FileStatus[] files = fs.listStatus(workingDir);
                        for (FileStatus file : files) {
                                String fileName = 
file.getPath().toString().substring(
@@ -1799,8 +1803,8 @@ public class TestUtils
         */
        public static boolean checkForTemporaryFiles() {
                try {
-                       FileSystem fs = FileSystem.get(conf);
                        Path workingDir = new Path(".");
+                       FileSystem fs = 
IOUtilFunctions.getFileSystem(workingDir, conf);
                        FileStatus[] files = fs.listStatus(workingDir);
                        for (FileStatus file : files) {
                                String fileName = 
file.getPath().toString().substring(
@@ -1828,8 +1832,9 @@ public class TestUtils
         */
        public static Path getFileInDirectory(String directory) {
                try {
-                       FileSystem fs = FileSystem.get(conf);
-                       FileStatus[] files = fs.listStatus(new Path(directory));
+                       Path path = new Path(directory);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
conf);
+                       FileStatus[] files = fs.listStatus(path);
                        if (files.length != 1)
                                throw new IOException("requires exactly one 
file in directory " + directory);
 
@@ -1851,8 +1856,9 @@ public class TestUtils
         *            filename
         */
        public static void createFile(String filename) throws IOException {
-                       FileSystem fs = FileSystem.get(conf);
-                       fs.create(new Path(filename));
+               Path path = new Path(filename);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
+               fs.create(path);
        }
 
        /**


Reply via email to