[SYSTEMML-1664] Fix handling of non-default DFS file URI schemes 

So far, SystemML's dfs file handling relied entirely on the configured
default fs implementation. This created problems when reading directly
from object stores (e.g., via swift, or s3) if the default hadoop
configuration (w/ local file system as the default fs implementation) is
on the class path. Similarly, it disallowed the access of local files
for read/write if hdfs is the default.

This patch fixes these issues by properly instantiating the dfs file
system handle, according to the given file URI schemes via new util
primitives to create global consistency for all I/O paths. Furthermore,
this patch also includes some minor cleanups such as the removal of
unnecessary validity checks for trustworthy file names (which did not
contain anymore rules given the generalization of file access policies).


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/557aae0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/557aae0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/557aae0f

Branch: refs/heads/master
Commit: 557aae0ff3e069295da39bb6e280f435e9514cd9
Parents: 88f4a46
Author: Matthias Boehm <[email protected]>
Authored: Sun Jun 4 00:47:53 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Sun Jun 4 14:01:18 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/sysml/api/DMLScript.java    |  14 +-
 .../org/apache/sysml/api/jmlc/Connection.java   |   2 +-
 .../sysml/api/mlcontext/ScriptFactory.java      |  19 +--
 .../java/org/apache/sysml/conf/DMLConfig.java   |  10 +-
 .../apache/sysml/hops/recompile/Recompiler.java |   5 +-
 .../org/apache/sysml/parser/DataExpression.java |  39 ++----
 .../org/apache/sysml/parser/ParserWrapper.java  |   8 +-
 .../controlprogram/ParForProgramBlock.java      |   4 +-
 .../parfor/DataPartitionerLocal.java            |  12 +-
 .../parfor/DataPartitionerRemoteReducer.java    |   5 +-
 .../DataPartitionerRemoteSparkReducer.java      |   2 +-
 .../controlprogram/parfor/RemoteDPParForMR.java |   5 +-
 .../parfor/RemoteParForColocatedFileSplit.java  |  18 +--
 .../controlprogram/parfor/RemoteParForMR.java   |   4 +-
 .../parfor/ResultMergeLocalFile.java            |  18 +--
 .../parfor/ResultMergeRemoteMR.java             |   7 +-
 .../instructions/cp/VariableCPInstruction.java  |   5 +-
 .../ParameterizedBuiltinCPFileInstruction.java  |  12 +-
 .../instructions/spark/RandSPInstruction.java   |  20 ++-
 .../apache/sysml/runtime/io/FrameReader.java    |   2 +-
 .../runtime/io/FrameReaderBinaryBlock.java      |   4 +-
 .../sysml/runtime/io/FrameReaderTextCSV.java    |   2 +-
 .../sysml/runtime/io/FrameReaderTextCell.java   |   2 +-
 .../runtime/io/FrameWriterBinaryBlock.java      |   2 +-
 .../io/FrameWriterBinaryBlockParallel.java      |   4 +-
 .../sysml/runtime/io/FrameWriterTextCSV.java    |   4 +-
 .../runtime/io/FrameWriterTextCSVParallel.java  |   4 +-
 .../sysml/runtime/io/FrameWriterTextCell.java   |   2 +-
 .../runtime/io/FrameWriterTextCellParallel.java |   4 +-
 .../sysml/runtime/io/IOUtilFunctions.java       |  25 ++++
 .../apache/sysml/runtime/io/MatrixReader.java   |   2 +-
 .../sysml/runtime/io/ReaderBinaryBlock.java     |   4 +-
 .../runtime/io/ReaderBinaryBlockParallel.java   |   2 +-
 .../sysml/runtime/io/ReaderBinaryCell.java      |   2 +-
 .../apache/sysml/runtime/io/ReaderTextCSV.java  |   2 +-
 .../sysml/runtime/io/ReaderTextCSVParallel.java |   4 +-
 .../apache/sysml/runtime/io/ReaderTextCell.java |   2 +-
 .../runtime/io/ReaderTextCellParallel.java      |   2 +-
 .../sysml/runtime/io/WriterBinaryBlock.java     |  10 +-
 .../runtime/io/WriterBinaryBlockParallel.java   |   2 +-
 .../sysml/runtime/io/WriterBinaryCell.java      |   8 +-
 .../sysml/runtime/io/WriterMatrixMarket.java    |  26 ++--
 .../runtime/io/WriterMatrixMarketParallel.java  |   2 +-
 .../apache/sysml/runtime/io/WriterTextCSV.java  |  49 +++----
 .../sysml/runtime/io/WriterTextCSVParallel.java |   2 +-
 .../apache/sysml/runtime/io/WriterTextCell.java |  21 +--
 .../runtime/io/WriterTextCellParallel.java      |   2 +-
 .../apache/sysml/runtime/matrix/CleanupMR.java  |  15 +-
 .../org/apache/sysml/runtime/matrix/SortMR.java |  21 ++-
 .../matrix/mapred/CSVAssignRowIDMapper.java     |   6 +-
 .../runtime/matrix/mapred/CSVReblockMapper.java |   5 +-
 .../matrix/mapred/MRJobConfiguration.java       |   8 +-
 .../runtime/matrix/sort/CompactInputFormat.java |   4 +-
 .../matrix/sort/PickFromCompactInputFormat.java |   8 +-
 .../runtime/transform/ApplyTfBBMapper.java      |   6 +-
 .../sysml/runtime/transform/ApplyTfCSVMR.java   |   3 +-
 .../runtime/transform/ApplyTfCSVMapper.java     |   3 +-
 .../sysml/runtime/transform/DataTransform.java  |   7 +-
 .../sysml/runtime/transform/GenTfMtdMR.java     |   3 +-
 .../apache/sysml/runtime/transform/TfUtils.java |  27 ++--
 .../sysml/runtime/util/LocalFileUtils.java      |  31 -----
 .../sysml/runtime/util/MapReduceTool.java       | 138 +++++++++++--------
 .../apache/sysml/udf/lib/RemoveEmptyRows.java   |   3 +-
 .../org/apache/sysml/yarn/DMLAppMaster.java     |   3 +-
 .../org/apache/sysml/yarn/DMLYarnClient.java    |  15 +-
 .../functions/transform/ScalingTest.java        |   5 +-
 .../org/apache/sysml/test/utils/TestUtils.java  |  68 ++++-----
 67 files changed, 357 insertions(+), 428 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/api/DMLScript.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/DMLScript.java 
b/src/main/java/org/apache/sysml/api/DMLScript.java
index 5d1f7eb..c927250 100644
--- a/src/main/java/org/apache/sysml/api/DMLScript.java
+++ b/src/main/java/org/apache/sysml/api/DMLScript.java
@@ -639,17 +639,13 @@ public class DMLScript
                                if(    fileName.startsWith("hdfs:")
                                        || fileName.startsWith("gpfs:") )
                                { 
-                                       if( 
!LocalFileUtils.validateExternalFilename(fileName, true) )
-                                               throw new 
LanguageException("Invalid (non-trustworthy) hdfs filename.");
-                                       FileSystem fs = 
FileSystem.get(ConfigurationManager.getCachedJobConf());
                                        Path scriptPath = new Path(fileName);
+                                       FileSystem fs = 
IOUtilFunctions.getFileSystem(scriptPath);
                                        in = new BufferedReader(new 
InputStreamReader(fs.open(scriptPath)));
                                }
                                // from local file system
                                else 
                                { 
-                                       if( 
!LocalFileUtils.validateExternalFilename(fileName, false) )
-                                               throw new 
LanguageException("Invalid (non-trustworthy) local filename.");
                                        in = new BufferedReader(new 
FileReader(fileName));
                                }
                                
@@ -969,14 +965,6 @@ public class DMLScript
                {
                        LOG.warn("Cannot run map/reduce tasks as user 
'"+userName+"'. Using tasktracker group '"+ttGroupName+"'.");              
                }
-               
-               //validate external filenames working directories
-               String localtmpdir = 
config.getTextValue(DMLConfig.LOCAL_TMP_DIR);
-               String hdfstmpdir = 
config.getTextValue(DMLConfig.SCRATCH_SPACE);
-               if( !LocalFileUtils.validateExternalFilename(localtmpdir, 
false) )
-                       throw new DMLRuntimeException("Invalid 
(non-trustworthy) local working directory.");
-               if( !LocalFileUtils.validateExternalFilename(hdfstmpdir, true) )
-                       throw new DMLRuntimeException("Invalid 
(non-trustworthy) hdfs working directory.");
        }
        
        public static void cleanupHadoopExecution( DMLConfig config ) 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/api/jmlc/Connection.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/jmlc/Connection.java 
b/src/main/java/org/apache/sysml/api/jmlc/Connection.java
index dc4ee64..6f25c29 100644
--- a/src/main/java/org/apache/sysml/api/jmlc/Connection.java
+++ b/src/main/java/org/apache/sysml/api/jmlc/Connection.java
@@ -232,8 +232,8 @@ public class Connection implements Closeable
                        if(    fname.startsWith("hdfs:") 
                                || fname.startsWith("gpfs:") ) 
                        { 
-                               FileSystem fs = 
FileSystem.get(ConfigurationManager.getCachedJobConf());
                                Path scriptPath = new Path(fname);
+                               FileSystem fs = 
IOUtilFunctions.getFileSystem(scriptPath);
                                in = new BufferedReader(new 
InputStreamReader(fs.open(scriptPath)));
                        }
                        // from local file system

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/api/mlcontext/ScriptFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/ScriptFactory.java 
b/src/main/java/org/apache/sysml/api/mlcontext/ScriptFactory.java
index a01dde2..4a30f00 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/ScriptFactory.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/ScriptFactory.java
@@ -30,8 +30,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.runtime.util.LocalFileUtils;
+import org.apache.sysml.runtime.io.IOUtilFunctions;
 
 /**
  * Factory for creating DML and PYDML Script objects from strings, files, URLs,
@@ -331,13 +330,7 @@ public class ScriptFactory {
                }
                String filePath = file.getPath();
                try {
-                       if (!LocalFileUtils.validateExternalFilename(filePath, 
false)) {
-                               throw new MLContextException("Invalid 
(non-trustworthy) local filename: " + filePath);
-                       }
-                       String scriptString = FileUtils.readFileToString(file);
-                       return scriptString;
-               } catch (IllegalArgumentException e) {
-                       throw new MLContextException("Error trying to read 
script string from file: " + filePath, e);
+                       return FileUtils.readFileToString(file);
                } catch (IOException e) {
                        throw new MLContextException("Error trying to read 
script string from file: " + filePath, e);
                }
@@ -359,17 +352,11 @@ public class ScriptFactory {
                }
                try {
                        if (scriptFilePath.startsWith("hdfs:") || 
scriptFilePath.startsWith("gpfs:")) {
-                               if 
(!LocalFileUtils.validateExternalFilename(scriptFilePath, true)) {
-                                       throw new MLContextException("Invalid 
(non-trustworthy) hdfs/gpfs filename: " + scriptFilePath);
-                               }
-                               FileSystem fs = 
FileSystem.get(ConfigurationManager.getCachedJobConf());
                                Path path = new Path(scriptFilePath);
+                               FileSystem fs = 
IOUtilFunctions.getFileSystem(path);
                                FSDataInputStream fsdis = fs.open(path);
                                return IOUtils.toString(fsdis);
                        } else {// from local file system
-                               if 
(!LocalFileUtils.validateExternalFilename(scriptFilePath, false)) {
-                                       throw new MLContextException("Invalid 
(non-trustworthy) local filename: " + scriptFilePath);
-                               }
                                File scriptFile = new File(scriptFilePath);
                                return FileUtils.readFileToString(scriptFile);
                        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/conf/DMLConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/conf/DMLConfig.java 
b/src/main/java/org/apache/sysml/conf/DMLConfig.java
index ccd8889..3a21d95 100644
--- a/src/main/java/org/apache/sysml/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysml/conf/DMLConfig.java
@@ -42,7 +42,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.parser.ParseException;
 import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.util.LocalFileUtils;
+import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.NodeList;
@@ -175,16 +175,12 @@ public class DMLConfig
                if (_fileName.startsWith("hdfs:") ||
                    _fileName.startsWith("gpfs:") )  // config file from DFS
                {
-                       if( !LocalFileUtils.validateExternalFilename(_fileName, 
true) )
-                               throw new IOException("Invalid 
(non-trustworthy) hdfs config filename.");
-                       FileSystem DFS = 
FileSystem.get(ConfigurationManager.getCachedJobConf());
-            Path configFilePath = new Path(_fileName);
+                       Path configFilePath = new Path(_fileName);
+                       FileSystem DFS = 
IOUtilFunctions.getFileSystem(configFilePath);
             domTree = builder.parse(DFS.open(configFilePath));  
                }
                else  // config from local file system
                {
-                       if( !LocalFileUtils.validateExternalFilename(_fileName, 
false) )
-                               throw new IOException("Invalid 
(non-trustworthy) local config filename.");
                        domTree = builder.parse(_fileName);
                }
                

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java 
b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
index 6c8ddea..9942035 100644
--- a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
@@ -30,7 +30,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.wink.json4j.JSONObject;
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.conf.ConfigurationManager;
@@ -1965,10 +1964,8 @@ public class Recompiler
                {
                        //get meta data filename
                        String mtdname = 
DataExpression.getMTDFileName(dop.getFileName());
-                       
-                       JobConf job = ConfigurationManager.getCachedJobConf();
-                       FileSystem fs = FileSystem.get(job);
                        Path path = new Path(mtdname);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(mtdname);
                        if( fs.exists(path) ){
                                BufferedReader br = null;
                                try

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/parser/DataExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/DataExpression.java 
b/src/main/java/org/apache/sysml/parser/DataExpression.java
index 51a51c9..47f85f4 100644
--- a/src/main/java/org/apache/sysml/parser/DataExpression.java
+++ b/src/main/java/org/apache/sysml/parser/DataExpression.java
@@ -37,7 +37,6 @@ import 
org.apache.sysml.parser.LanguageException.LanguageErrorCodes;
 import org.apache.sysml.parser.common.CustomErrorListener;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.util.LocalFileUtils;
 import org.apache.sysml.runtime.util.MapReduceTool;
 import org.apache.sysml.runtime.util.UtilFunctions;
 import org.apache.sysml.utils.JSONHelper;
@@ -540,13 +539,7 @@ public class DataExpression extends DataIdentifier
        }
        
        public static String getMTDFileName(String inputFileName) throws 
LanguageException {
-               String mtdName = inputFileName + ".mtd";
-               
-               //validate read filename
-               if( !LocalFileUtils.validateExternalFilename(mtdName, true) )
-                       throw new LanguageException("Invalid (non-trustworthy) 
hdfs read filename.");
-
-               return mtdName;
+               return inputFileName + ".mtd";
        }
        
        /**
@@ -1065,11 +1058,6 @@ public class DataExpression extends DataIdentifier
                        }*/
                        
                        //validate read filename
-                       String fnameWrite = getVarParam(IO_FILENAME).toString();
-                       if( 
!LocalFileUtils.validateExternalFilename(fnameWrite, true) ) //always 
unconditional
-                               raiseValidateError("Invalid (non-trustworthy) 
hdfs write filename.", false);
-               
-                       
                        if (getVarParam(FORMAT_TYPE) == null || 
getVarParam(FORMAT_TYPE).toString().equalsIgnoreCase("text"))
                                getOutput().setBlockDimensions(-1, -1);
                        else if 
(getVarParam(FORMAT_TYPE).toString().equalsIgnoreCase("binary"))
@@ -1923,7 +1911,7 @@ public class DataExpression extends DataIdentifier
                                        continue;
                                BufferedReader br = null;
                                try {
-                                       FileSystem fs = 
FileSystem.get(ConfigurationManager.getCachedJobConf());
+                                       FileSystem fs = 
IOUtilFunctions.getFileSystem(childPath);
                                        br = new BufferedReader(new 
InputStreamReader(fs.open(childPath)));
                                        JSONObject childObj = 
JSONHelper.parse(br);
                                        for( Object obj : childObj.entrySet() ){
@@ -1947,8 +1935,9 @@ public class DataExpression extends DataIdentifier
                {
                        BufferedReader br = null;
                        try {
-                               FileSystem fs = 
FileSystem.get(ConfigurationManager.getCachedJobConf());
-                               br = new BufferedReader(new 
InputStreamReader(fs.open(new Path(filename))));
+                               Path path = new Path(filename);
+                               FileSystem fs = 
IOUtilFunctions.getFileSystem(path);
+                               br = new BufferedReader(new 
InputStreamReader(fs.open(path)));
                                retVal = new JSONObject(br);
                        } 
                        catch (Exception e){
@@ -1972,19 +1961,16 @@ public class DataExpression extends DataIdentifier
                
                try 
                {
-                       FileSystem fs = 
FileSystem.get(ConfigurationManager.getCachedJobConf());
-                       Path pt = new Path(filename);
-                       if (fs.exists(pt)){
-                               exists = true;
-                       }
-                       
-                       boolean getFileStatusIsDir = 
fs.getFileStatus(pt).isDirectory();
+                       Path path = new Path(filename);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path);
+                       exists = fs.exists(path);
+                       boolean getFileStatusIsDir = 
fs.getFileStatus(path).isDirectory();
                        
                        if (exists && getFileStatusIsDir){
                                raiseValidateError("MatrixMarket files as 
directories not supported", conditional);
                        }
                        else if (exists) {
-                               BufferedReader in = new BufferedReader(new 
InputStreamReader(fs.open(pt)));
+                               BufferedReader in = new BufferedReader(new 
InputStreamReader(fs.open(path)));
                                try
                                {
                                        retVal[0] = in.readLine();
@@ -2027,8 +2013,9 @@ public class DataExpression extends DataIdentifier
                {
                        BufferedReader in = null;
                        try {
-                               FileSystem fs = 
FileSystem.get(ConfigurationManager.getCachedJobConf());
-                               in = new BufferedReader(new 
InputStreamReader(fs.open(new Path(inputFileName))));
+                               Path path = new Path(inputFileName);
+                               FileSystem fs = 
IOUtilFunctions.getFileSystem(path);
+                               in = new BufferedReader(new 
InputStreamReader(fs.open(path)));
                                String headerLine = new String("");             
        
                                if (in.ready())
                                        headerLine = in.readLine();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/parser/ParserWrapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/ParserWrapper.java 
b/src/main/java/org/apache/sysml/parser/ParserWrapper.java
index 2711b6a..9a32b00 100644
--- a/src/main/java/org/apache/sysml/parser/ParserWrapper.java
+++ b/src/main/java/org/apache/sysml/parser/ParserWrapper.java
@@ -33,10 +33,8 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.parser.common.CustomErrorListener.ParseIssue;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
-import org.apache.sysml.runtime.util.LocalFileUtils;
 
 /**
  * Base class for all dml parsers in order to make the various compilation 
chains
@@ -100,18 +98,14 @@ public abstract class ParserWrapper {
                                || script.startsWith("gpfs:") ) 
                        {
                                LOG.debug("Looking for the following file in 
HDFS or GPFS: " + script);
-                               if( 
!LocalFileUtils.validateExternalFilename(script, true) )
-                                       throw new LanguageException("Invalid 
(non-trustworthy) hdfs filename.");
-                               FileSystem fs = 
FileSystem.get(ConfigurationManager.getCachedJobConf());
                                Path scriptPath = new Path(script);
+                               FileSystem fs = 
IOUtilFunctions.getFileSystem(scriptPath);
                                in = new BufferedReader(new 
InputStreamReader(fs.open(scriptPath)));
                        }
                        // from local file system
                        else 
                        {
                                LOG.debug("Looking for the following file in 
the local file system: " + script);
-                               if( 
!LocalFileUtils.validateExternalFilename(script, false) )
-                                       throw new LanguageException("Invalid 
(non-trustworthy) local filename.");
                                if (Files.exists(Paths.get(script)))
                                        in = new BufferedReader(new 
FileReader(script));
                                else  // check in scripts/ directory for file 
(useful for tests)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index f3de422..c9dcc22 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -1633,7 +1633,7 @@ public class ParForProgramBlock extends ForProgramBlock
                try
                {
                        Path path = new Path(fname);
-                       FileSystem fs = 
FileSystem.get(ConfigurationManager.getCachedJobConf());
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path);
                        br = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));
                
                        boolean flagFirst = true; //workaround for keeping gen 
order
@@ -1662,7 +1662,7 @@ public class ParForProgramBlock extends ForProgramBlock
                try
                {
                        Path path = new Path( fname );
-                       FileSystem fs = 
FileSystem.get(ConfigurationManager.getCachedJobConf());
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path);
                        br = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));
                
                        Task t = null;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
index eb273b6..f0b5674 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java
@@ -239,7 +239,7 @@ public class DataPartitionerLocal extends DataPartitioner
                        //check and add input path
                        JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
                        Path path = new Path(fname);
-                       FileSystem fs = FileSystem.get(job);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
                        
                        //prepare sequence file reader, and write to local 
staging area 
                        LinkedList<Cell> buffer = new LinkedList<Cell>();
@@ -327,7 +327,7 @@ public class DataPartitionerLocal extends DataPartitioner
                        //check and add input path
                        JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
                        Path path = new Path(fname);
-                       FileSystem fs = FileSystem.get(job);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
                        
                        //prepare sequence file reader, and write to local 
staging area
                        MatrixIndexes key = new MatrixIndexes(); 
@@ -400,7 +400,7 @@ public class DataPartitionerLocal extends DataPartitioner
                        //check and add input path
                        JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
                        Path path = new Path(fname);
-                       FileSystem fs = FileSystem.get(job);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
                        
                        //prepare sequence file reader, and write to local 
staging area
                        MatrixIndexes key = new MatrixIndexes(); 
@@ -601,8 +601,8 @@ public class DataPartitionerLocal extends DataPartitioner
                throws IOException
        {
                long key = getKeyFromFilePath(lpdir);
-               FileSystem fs = FileSystem.get(job);
                Path path =  new Path(dir+"/"+key);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, 
path, MatrixIndexes.class, MatrixBlock.class); //beware ca 50ms
 
                try
@@ -637,8 +637,8 @@ public class DataPartitionerLocal extends DataPartitioner
                throws IOException
        {
                long key = getKeyFromFilePath(lpdir);
-               FileSystem fs = FileSystem.get(job);
                Path path =  new Path(dir+"/"+key);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, 
path, MatrixIndexes.class, MatrixCell.class); //beware ca 50ms
        
                try
@@ -667,8 +667,8 @@ public class DataPartitionerLocal extends DataPartitioner
                throws IOException
        {
                long key = getKeyFromFilePath(lpdir);
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path(dir+"/"+key);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                BufferedWriter out = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));          
                try
                {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteReducer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteReducer.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteReducer.java
index d493e90..21d5a64 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteReducer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteReducer.java
@@ -104,8 +104,9 @@ public class DataPartitionerRemoteReducer
                        _fnameNew = fnameNew;
                        
                        try {
-                               _fs = FileSystem.get(_job);
-                       } catch (IOException e) {
+                               _fs = IOUtilFunctions.getFileSystem(new 
Path(fnameNew), job);
+                       } 
+                       catch (IOException e) {
                                throw new RuntimeException(e);
                        }
                }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
index 174c544..ad228eb 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
@@ -66,8 +66,8 @@ public class DataPartitionerRemoteSparkReducer implements 
VoidFunction<Tuple2<Lo
                {
                        //create sequence file writer
                        Configuration job = new 
Configuration(ConfigurationManager.getCachedJobConf());
-                       FileSystem fs = FileSystem.get(job);
                        Path path = new Path(_fnameNew + File.separator + key);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
                        writer = new SequenceFile.Writer(fs, job, path, 
MatrixIndexes.class, MatrixBlock.class, 
                                        
job.getInt(MRConfigurationNames.IO_FILE_BUFFER_SIZE, 4096),
                     (short)_replication, fs.getDefaultBlockSize(), null, new 
SequenceFile.Metadata()); 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
index a8fda17..60d23c6 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java
@@ -247,20 +247,19 @@ public class RemoteDPParForMR
        {
                HashMap<Long,LocalVariableMap> tmp = new 
HashMap<Long,LocalVariableMap>();
 
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path(fname);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                LongWritable key = new LongWritable(); //workerID
                Text value = new Text();               //serialized var header 
(incl filename)
                
                int countAll = 0;
                for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
                {
-                       SequenceFile.Reader reader = new 
SequenceFile.Reader(FileSystem.get(job),lpath,job);
+                       SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,job);
                        try
                        {
                                while( reader.next(key, value) )
                                {
-                                       
//System.out.println("key="+key.get()+", value="+value.toString());
                                        if( !tmp.containsKey( key.get() ) )
                                        tmp.put(key.get(), new LocalVariableMap 
());       
                                        Object[] dat = 
ProgramConverter.parseDataObject( value.toString() );

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedFileSplit.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedFileSplit.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedFileSplit.java
index e4895cb..ade08e6 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedFileSplit.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedFileSplit.java
@@ -81,7 +81,7 @@ public class RemoteParForColocatedFileSplit extends FileSplit
                //time.start();
                
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               FileSystem fs = FileSystem.get(job);
+               FileSystem fs = IOUtilFunctions.getFileSystem(getPath(), job);
                
                //read task string
                LongWritable key = new LongWritable();
@@ -126,23 +126,7 @@ public class RemoteParForColocatedFileSplit extends 
FileSplit
                                for( BlockLocation bl : tmp1 )
                                        countHosts(hosts, bl.getHosts());
                        }
-                       
-                       /*
-                       int lFrom  = t.getIterations().get(0).getIntValue();
-                       int lTo    = t.getIterations().get(1).getIntValue();
-                       int lIncr  = t.getIterations().get(2).getIntValue();    
                        
-                       for( int i=lFrom; i<=lTo; i+=lIncr )
-                       {
-                               String fname = _fname+"/"+String.valueOf( 
((i-_offset)/_blen+_offset) );
-                               FileSystem fs = FileSystem.get(job);
-                               FileStatus status = fs.getFileStatus(new 
Path(fname)); 
-                               BlockLocation[] tmp1 = 
fs.getFileBlockLocations(status, 0, status.getLen());
-                               for( BlockLocation bl : tmp1 )
-                                       countHosts(hosts, bl.getHosts());
-                       }*/
                }
-
-               //System.out.println("Get locations "+time.stop()+"");
                
                //majority consensus on top host
                return getTopHosts(hosts);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
index 465136d..67c3368 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java
@@ -258,15 +258,15 @@ public class RemoteParForMR
        {
                HashMap<Long,LocalVariableMap> tmp = new 
HashMap<Long,LocalVariableMap>();
 
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path(fname);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                LongWritable key = new LongWritable(); //workerID
                Text value = new Text();               //serialized var header 
(incl filename)
                
                int countAll = 0;
                for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
                {
-                       SequenceFile.Reader reader = new 
SequenceFile.Reader(FileSystem.get(job),lpath,job);
+                       SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,job);
                        try
                        {
                                while( reader.next(key, value) )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
index 8e201d0..7dae670 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
@@ -222,8 +222,8 @@ public class ResultMergeLocalFile extends ResultMerge
                        
                        //actual merge
                        JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-                       FileSystem fs = FileSystem.get(job);
                        Path path = new Path( fnameNew );
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
                        BufferedWriter out = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));          
                        
                        String valueStr = null;
@@ -324,8 +324,8 @@ public class ResultMergeLocalFile extends ResultMerge
                        
                        //actual merge
                        JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-                       FileSystem fs = FileSystem.get(job);
                        Path path = new Path( fnameNew );                       
                
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
                        SequenceFile.Writer out = new SequenceFile.Writer(fs, 
job, path, MatrixIndexes.class, MatrixCell.class); //beware ca 50ms
                        
                        MatrixIndexes key = new MatrixIndexes();
@@ -474,8 +474,8 @@ public class ResultMergeLocalFile extends ResultMerge
                MatrixBlock value = new MatrixBlock();
                
                JobConf tmpJob = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               FileSystem fs = FileSystem.get(tmpJob);
                Path tmpPath = new Path(mo.getFileName());
+               FileSystem fs = IOUtilFunctions.getFileSystem(tmpPath, tmpJob);
                
                for(Path lpath : MatrixReader.getSequenceFilePaths(fs, tmpPath))
                {
@@ -566,7 +566,7 @@ public class ResultMergeLocalFile extends ResultMerge
        {               
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
                Path path = new Path(mo.getFileName());
-               FileSystem fs = FileSystem.get(job);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                
                LinkedList<Cell> buffer = new LinkedList<Cell>();
                MatrixIndexes key = new MatrixIndexes();
@@ -649,8 +649,8 @@ public class ResultMergeLocalFile extends ResultMerge
                throws IOException, DMLRuntimeException
        {
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fnameNew );       
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                
                MatrixCharacteristics mc = metadata.getMatrixCharacteristics();
                long rlen = mc.getRows();
@@ -745,8 +745,8 @@ public class ResultMergeLocalFile extends ResultMerge
                throws IOException, DMLRuntimeException
        {
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fnameNew );       
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                
                MatrixCharacteristics mc = metadata.getMatrixCharacteristics();
                long rlen = mc.getRows();
@@ -882,8 +882,8 @@ public class ResultMergeLocalFile extends ResultMerge
                throws IOException, DMLRuntimeException
        {
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fnameNew );       
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                
                MatrixCharacteristics mc = metadata.getMatrixCharacteristics();
                long rlen = mc.getRows();
@@ -1009,9 +1009,9 @@ public class ResultMergeLocalFile extends ResultMerge
                throws CacheException, IOException
        {
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fnameNew );
-
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
+               
                //create output dir
                fs.mkdirs(path);
                

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
index 98a063e..e2377c4 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
@@ -39,6 +39,7 @@ import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.parfor.util.StagingFileUtils;
+import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
@@ -194,8 +195,10 @@ public class ResultMergeRemoteMR extends ResultMerge
                        /////
                        //configure the MR job
                        if( withCompare ) {
-                               pathCompare = new 
Path(fname).makeQualified(FileSystem.get(job));
-                               MRJobConfiguration.setResultMergeInfo(job, 
pathCompare.toString(), ii, 
LocalFileUtils.getWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE), rlen, clen, 
brlen, bclen);
+                               FileSystem fs = 
IOUtilFunctions.getFileSystem(pathNew, job);
+                               pathCompare = new Path(fname).makeQualified(fs);
+                               MRJobConfiguration.setResultMergeInfo(job, 
pathCompare.toString(), ii, 
+                                       
LocalFileUtils.getWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE), rlen, clen, 
brlen, bclen);
                        }
                        else
                                MRJobConfiguration.setResultMergeInfo(job, 
"null", ii, LocalFileUtils.getWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE), 
rlen, clen, bclen, bclen);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
index 907c102..4976f26 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java
@@ -25,9 +25,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysml.api.DMLScript;
-import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.lops.Lop;
 import org.apache.sysml.lops.UnaryCP;
 import org.apache.sysml.parser.Expression.DataType;
@@ -899,8 +897,7 @@ public class VariableCPInstruction extends CPInstruction
                        MapReduceTool.writeObjectToHDFS(scalar.getValue(), 
fname);
                        MapReduceTool.writeScalarMetaDataFile(fname +".mtd", 
input1.getValueType());
 
-                       JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-                       FileSystem fs = FileSystem.get(job);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(fname);
                        if (fs instanceof LocalFileSystem) {
                                Path path = new Path(fname);
                                
IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
index 715bf4f..6a7127f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java
@@ -267,7 +267,7 @@ public class ParameterizedBuiltinCPFileInstruction extends 
ParameterizedBuiltinC
                        //prepare input
                        JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
                        Path path = new Path(fnameOld);
-                       FileSystem fs = FileSystem.get(job);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
                        if( !fs.exists(path) )  
                                throw new IOException("File "+fnameOld+" does 
not exist on HDFS.");
                        FileInputFormat.addInputPath(job, path); 
@@ -321,7 +321,7 @@ public class ParameterizedBuiltinCPFileInstruction extends 
ParameterizedBuiltinC
                        //prepare input
                        JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
                        Path path = new Path(fnameOld);
-                       FileSystem fs = FileSystem.get(job);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
                        if( !fs.exists(path) )  
                                throw new IOException("File "+fnameOld+" does 
not exist on HDFS.");
                        
@@ -379,7 +379,7 @@ public class ParameterizedBuiltinCPFileInstruction extends 
ParameterizedBuiltinC
                        //prepare input
                        JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
                        Path path = new Path(fnameOld);
-                       FileSystem fs = FileSystem.get(job);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
                        if( !fs.exists(path) )  
                                throw new IOException("File "+fnameOld+" does 
not exist on HDFS.");
                        
@@ -631,7 +631,7 @@ public class ParameterizedBuiltinCPFileInstruction extends 
ParameterizedBuiltinC
                        //prepare input
                        JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
                        Path path = new Path(fnameNew);
-                       FileSystem fs = FileSystem.get(job);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
                        String metaOut = stagingDir+"/meta";
 
                        //prepare output
@@ -732,7 +732,7 @@ public class ParameterizedBuiltinCPFileInstruction extends 
ParameterizedBuiltinC
                        //prepare input
                        JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
                        Path path = new Path(fnameNew);
-                       FileSystem fs = FileSystem.get(job);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
                        String metaOut = stagingDir+"/meta";
        
                        //prepare output
@@ -883,7 +883,7 @@ public class ParameterizedBuiltinCPFileInstruction extends 
ParameterizedBuiltinC
                        //prepare input
                        JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
                        Path path = new Path(fnameNew);
-                       FileSystem fs = FileSystem.get(job);
+                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
                        String metaOut = stagingDir+"/meta";
        
                        //prepare output

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
index b2f3503..b075076 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
@@ -43,7 +43,6 @@ import scala.Tuple2;
 
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
-import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.hops.DataGenOp;
 import org.apache.sysml.hops.Hop.DataGenMethod;
 import org.apache.sysml.hops.OptimizerUtils;
@@ -385,12 +384,12 @@ public class RandSPInstruction extends UnarySPInstruction
                //b) file-based seed rdd construction (for robustness wrt large 
number of blocks)
                else
                {
-                       String path = 
LibMatrixDatagen.generateUniqueSeedPath(dir);
+                       Path path = new 
Path(LibMatrixDatagen.generateUniqueSeedPath(dir));
                        PrintWriter pw = null;
                        try
                        {
-                               FileSystem fs = 
FileSystem.get(ConfigurationManager.getCachedJobConf());
-                               pw = new PrintWriter(fs.create(new Path(path)));
+                               FileSystem fs = 
IOUtilFunctions.getFileSystem(path);
+                               pw = new PrintWriter(fs.create(path));
                                StringBuilder sb = new StringBuilder();
                                for( long i=0; i<numBlocks; i++ ) {
                                        sb.append(1 + i/numColBlocks);
@@ -416,7 +415,7 @@ public class RandSPInstruction extends UnarySPInstruction
                        
                        //create seeds rdd 
                        seedsRDD = sec.getSparkContext()
-                                       .textFile(path, numPartitions)
+                                       .textFile(path.toString(), 
numPartitions)
                                        .mapToPair(new ExtractSeedTuple());
                }
                
@@ -476,13 +475,12 @@ public class RandSPInstruction extends UnarySPInstruction
                //b) file-based offset rdd construction (for robustness wrt 
large number of blocks)
                else
                {
-                       String path = 
LibMatrixDatagen.generateUniqueSeedPath(dir);
+                       Path path = new 
Path(LibMatrixDatagen.generateUniqueSeedPath(dir));
                        
                        PrintWriter pw = null;
-                       try
-                       {
-                               FileSystem fs = 
FileSystem.get(ConfigurationManager.getCachedJobConf());
-                               pw = new PrintWriter(fs.create(new Path(path)));
+                       try {
+                               FileSystem fs = 
IOUtilFunctions.getFileSystem(path);
+                               pw = new PrintWriter(fs.create(path));
                                for( long i=0; i<numBlocks; i++ ) {
                                        double off = seq_from + 
seq_incr*i*rowsInBlock;
                                        pw.println(off);
@@ -500,7 +498,7 @@ public class RandSPInstruction extends UnarySPInstruction
                        
                        //create seeds rdd 
                        offsetsRDD = sec.getSparkContext()
-                                       .textFile(path, numPartitions)
+                                       .textFile(path.toString(), 
numPartitions)
                                        .map(new ExtractOffsetTuple());
                }
                

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
index 321735d..03271d8 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
@@ -153,7 +153,7 @@ public abstract class FrameReader
                        throw new IOException("File "+path.toString()+" does 
not exist on HDFS/LFS.");
        
                //check for empty file
-               if( MapReduceTool.isFileEmpty( fs, path.toString() ) )
+               if( MapReduceTool.isFileEmpty(fs, path) )
                        throw new EOFException("Empty input file "+ 
path.toString() +".");              
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
index 32feea3..e08aa96 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
@@ -50,8 +50,8 @@ public class FrameReaderBinaryBlock extends FrameReader
                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fname ); 
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                
                //check existence and non-empty file
                checkValidInputFile(fs, path); 
@@ -128,8 +128,8 @@ public class FrameReaderBinaryBlock extends FrameReader
        {
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fname ); 
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                
                LongWritable key = new LongWritable();
                FrameBlock value = new FrameBlock();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
index e86cbe2..9e10f2c 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
@@ -62,8 +62,8 @@ public class FrameReaderTextCSV extends FrameReader
        {
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fname );
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                FileInputFormat.addInputPath(job, path);
                
                //check existence and non-empty file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
index 548452f..55128f8 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
@@ -58,8 +58,8 @@ public class FrameReaderTextCell extends FrameReader
                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fname );
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                
                //check existence and non-empty file
                checkValidInputFile(fs, path); 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java
index 563aeb0..819b7d0 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java
@@ -63,7 +63,7 @@ public class FrameWriterBinaryBlock extends FrameWriter
        protected void writeBinaryBlockFrameToHDFS( Path path, JobConf job, 
FrameBlock src, long rlen, long clen )
                        throws IOException, DMLRuntimeException
        {
-               FileSystem fs = FileSystem.get(job);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path);
                int blen = ConfigurationManager.getBlocksize();
                
                //sequential write to single file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java
index 4a8c748..a25fe75 100644
--- 
a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java
+++ 
b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java
@@ -65,8 +65,8 @@ public class FrameWriterBinaryBlockParallel extends 
FrameWriterBinaryBlock
                }
                
                //create directory for concurrent tasks
-               MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
-               FileSystem fs = FileSystem.get(job);
+               MapReduceTool.createDirIfNotExistOnHDFS(path, 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path);
                
                //create and execute write tasks
                try 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
index 24bdfce..83f3861 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
@@ -74,8 +74,8 @@ public class FrameWriterTextCSV extends FrameWriter
        protected void writeCSVFrameToHDFS( Path path, JobConf job, FrameBlock 
src, long rlen, long clen, CSVFileFormatProperties csvprops ) 
                throws IOException
        {
-               FileSystem fs = FileSystem.get(job);
-        
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
+               
                //sequential write to single text file
                writeCSVFrameToFile(path, job, fs, src, 0, (int)rlen, 
csvprops);        
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java
index 9a3a33d..10f0827 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java
@@ -67,8 +67,8 @@ public class FrameWriterTextCSVParallel extends 
FrameWriterTextCSV
                }
                
                //create directory for concurrent tasks
-               MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
-               FileSystem fs = FileSystem.get(job);
+               MapReduceTool.createDirIfNotExistOnHDFS(path, 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                
                //create and execute tasks
                try 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
index 0768d4a..7263e7a 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java
@@ -63,7 +63,7 @@ public class FrameWriterTextCell extends FrameWriter
        protected void writeTextCellFrameToHDFS( Path path, JobConf job, 
FrameBlock src, long rlen, long clen )
                throws IOException
        {
-               FileSystem fs = FileSystem.get(job);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                
                //sequential write to single text file
                writeTextCellFrameToFile(path, job, fs, src, 0, (int)rlen);     

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java
index 8a32952..8eed53c 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java
@@ -63,8 +63,8 @@ public class FrameWriterTextCellParallel extends 
FrameWriterTextCell
                }
                
                //create directory for concurrent tasks
-               MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
-               FileSystem fs = FileSystem.get(job);
+               MapReduceTool.createDirIfNotExistOnHDFS(path, 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                
                //create and execute tasks
                try 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java 
b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
index df8174f..44d9aee 100644
--- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
@@ -32,6 +32,7 @@ import org.apache.commons.io.input.ReaderInputStream;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,6 +44,7 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.runtime.transform.TfUtils;
 import org.apache.sysml.runtime.util.LocalFileUtils;
 import org.apache.sysml.runtime.util.UtilFunctions;
@@ -53,6 +55,29 @@ public class IOUtilFunctions
 
        private static final char CSV_QUOTE_CHAR = '"';
 
+       public static FileSystem getFileSystem(String fname) throws IOException 
{
+               return getFileSystem(new Path(fname),
+                       ConfigurationManager.getCachedJobConf());
+       }
+       
+       public static FileSystem getFileSystem(Path fname) throws IOException {
+               return getFileSystem(fname, 
+                       ConfigurationManager.getCachedJobConf());
+       }
+       
+       public static FileSystem getFileSystem(Path fname, Configuration conf) 
throws IOException {
+               return FileSystem.get(fname.toUri(), conf);
+       }
+       
+       public static boolean isSameFileScheme(Path path1, Path path2) {
+               if( path1 == null || path2 == null || path1.toUri() == null || 
path2.toUri() == null)
+                       return false;
+               String scheme1 = path1.toUri().getScheme();
+               String scheme2 = path2.toUri().getScheme();
+               return (scheme1 == null && scheme2 == null)
+                       || scheme1.equals(scheme2);
+       }
+       
        public static void closeSilently( Closeable io ) {
                try {
                        if( io != null )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java 
b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
index 8e624a6..dbd1602 100644
--- a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
@@ -134,7 +134,7 @@ public abstract class MatrixReader
                        throw new IOException("File "+path.toString()+" does 
not exist on HDFS/LFS.");
        
                //check for empty file
-               if( MapReduceTool.isFileEmpty( fs, path.toString() ) )
+               if( MapReduceTool.isFileEmpty(fs, path) )
                        throw new EOFException("Empty input file "+ 
path.toString() +".");
                
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
index 4c8549e..16241c9 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
@@ -58,8 +58,8 @@ public class ReaderBinaryBlock extends MatrixReader
                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
-               FileSystem fs = _localFS ? FileSystem.getLocal(job) : 
FileSystem.get(job);
                Path path = new Path( (_localFS ? "file:///" : "") + fname); 
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                
                //check existence and non-empty file
                checkValidInputFile(fs, path); 
@@ -90,8 +90,8 @@ public class ReaderBinaryBlock extends MatrixReader
                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
-               FileSystem fs = _localFS ? FileSystem.getLocal(job) : 
FileSystem.get(job);
                Path path = new Path( (_localFS ? "file:///" : "") + fname); 
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                
                //check existence and non-empty file
                checkValidInputFile(fs, path); 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
index 387eac8..2afdb0e 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
@@ -60,8 +60,8 @@ public class ReaderBinaryBlockParallel extends 
ReaderBinaryBlock
                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
-               FileSystem fs = _localFS ? FileSystem.getLocal(job) : 
FileSystem.get(job);
                Path path = new Path( (_localFS ? "file:///" : "") + fname); 
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                                
                //check existence and non-empty file
                checkValidInputFile(fs, path); 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
index dcf9e7b..2bdc25e 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
@@ -44,8 +44,8 @@ public class ReaderBinaryCell extends MatrixReader
                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fname );
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                
                //check existence and non-empty file
                checkValidInputFile(fs, path); 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
index 6256955..27637ab 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
@@ -60,8 +60,8 @@ public class ReaderTextCSV extends MatrixReader
                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fname );
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                
                //check existence and non-empty file
                checkValidInputFile(fs, path); 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
index 75b3bd9..88388f4 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
@@ -74,9 +74,9 @@ public class ReaderTextCSVParallel extends MatrixReader
        {
                // prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path(fname);
-
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
+               
                FileInputFormat.addInputPath(job, path);
                TextInputFormat informat = new TextInputFormat();
                informat.configure(job);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
index 3b93c33..fd3af44 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
@@ -59,8 +59,8 @@ public class ReaderTextCell extends MatrixReader
                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fname );
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                
                //check existence and non-empty file
                checkValidInputFile(fs, path); 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
index 9501b6d..5e0e043 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
@@ -83,8 +83,8 @@ public class ReaderTextCellParallel extends MatrixReader
        {
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fname );
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                
                //check existence and non-empty file
                checkValidInputFile(fs, path);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java
index cdc49fb..78ffc0b 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java
@@ -50,9 +50,9 @@ public class WriterBinaryBlock extends MatrixWriter
        {
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fname );
-
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
+               
                //if the file already exists on HDFS, remove it.
                MapReduceTool.deleteFileIfExistOnHDFS( fname );
 
@@ -76,8 +76,8 @@ public class WriterBinaryBlock extends MatrixWriter
        {
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
                Path path = new Path( fname );
-               FileSystem fs = FileSystem.get(job);
-
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
+               
                SequenceFile.Writer writer = null;
                try {
                        writer = new SequenceFile.Writer(fs, job, path,
@@ -268,7 +268,7 @@ public class WriterBinaryBlock extends MatrixWriter
                        throws IOException, DMLRuntimeException
        {
                boolean sparse = src.isInSparseFormat();
-               FileSystem fs = FileSystem.get(job);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                
                //set up preferred custom serialization framework for binary 
block format
                if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java
index ccd897c..6f33011 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java
@@ -64,7 +64,7 @@ public class WriterBinaryBlockParallel extends 
WriterBinaryBlock
                }
 
                //create directory for concurrent tasks
-               MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
+               MapReduceTool.createDirIfNotExistOnHDFS(path, 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
                
                //create and execute write tasks
                try 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java
index c451f39..a072a6b 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java
@@ -51,7 +51,7 @@ public class WriterBinaryCell extends MatrixWriter
                //core write
                writeBinaryCellMatrixToHDFS(path, job, src, rlen, clen, brlen, 
bclen);
 
-               FileSystem fs = FileSystem.get(job);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
        }
 
@@ -62,8 +62,8 @@ public class WriterBinaryCell extends MatrixWriter
        {
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
                Path path = new Path( fname );
-               FileSystem fs = FileSystem.get(job);
-
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
+               
                SequenceFile.Writer writer = null;
                try {
                        writer = new SequenceFile.Writer(fs, job, path,
@@ -86,7 +86,7 @@ public class WriterBinaryCell extends MatrixWriter
        {
                boolean sparse = src.isInSparseFormat();
                boolean entriesWritten = false;
-               FileSystem fs = FileSystem.get(job);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, 
path, MatrixIndexes.class, MatrixCell.class);
                
                MatrixIndexes indexes = new MatrixIndexes();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java
index 9838d01..a2e2013 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java
@@ -52,9 +52,9 @@ public class WriterMatrixMarket extends MatrixWriter
                                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fname );
-
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
+               
                //if the file already exists on HDFS, remove it.
                MapReduceTool.deleteFileIfExistOnHDFS( fname );
                        
@@ -69,7 +69,7 @@ public class WriterMatrixMarket extends MatrixWriter
                throws IOException, DMLRuntimeException 
        {
                Path path = new Path( fname );
-               FileSystem fs = 
FileSystem.get(ConfigurationManager.getCachedJobConf());
+               FileSystem fs = IOUtilFunctions.getFileSystem(path);
                
                FSDataOutputStream writer = null;
                try {
@@ -172,13 +172,13 @@ public class WriterMatrixMarket extends MatrixWriter
                
                  Path src = new Path (srcFileName);
              Path merge = new Path (fileName);
-             FileSystem hdfs = FileSystem.get(conf);
-           
-             if (hdfs.exists (merge)) {
-               hdfs.delete(merge, true);
+             FileSystem fs = IOUtilFunctions.getFileSystem(src, conf);
+                       
+             if (fs.exists (merge)) {
+               fs.delete(merge, true);
              }
         
-             OutputStream out = hdfs.create(merge, true);
+             OutputStream out = fs.create(merge, true);
 
              // write out the header first 
              StringBuilder  sb = new StringBuilder();
@@ -189,12 +189,12 @@ public class WriterMatrixMarket extends MatrixWriter
              out.write (sb.toString().getBytes());
 
              // if the source is a directory
-             if (hdfs.getFileStatus(src).isDirectory()) {
+             if (fs.getFileStatus(src).isDirectory()) {
                try {
-                 FileStatus[] contents = hdfs.listStatus(src);
+                 FileStatus[] contents = fs.listStatus(src);
                  for (int i = 0; i < contents.length; i++) {
                    if (!contents[i].isDirectory()) {
-                      InputStream in = hdfs.open (contents[i].getPath());
+                      InputStream in = fs.open (contents[i].getPath());
                       try {
                         IOUtils.copyBytes (in, out, conf, false);
                       }  finally {
@@ -205,10 +205,10 @@ public class WriterMatrixMarket extends MatrixWriter
                 } finally {
                         IOUtilFunctions.closeSilently(out);
                 }
-             } else if (hdfs.isFile(src))  {
+             } else if (fs.isFile(src))  {
                InputStream in = null;
                try {
-                 in = hdfs.open (src);
+                 in = fs.open (src);
                  IOUtils.copyBytes (in, out, conf, true);
                } 
                finally {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java
index f3c1ce8..1fed377 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java
@@ -62,7 +62,7 @@ public class WriterMatrixMarketParallel extends 
WriterMatrixMarket
                }
                
                //create directory for concurrent tasks
-               MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
+               MapReduceTool.createDirIfNotExistOnHDFS(path, 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
 
                //create and execute tasks
                try 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java
index 479ce40..6c20baa 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java
@@ -64,9 +64,9 @@ public class WriterTextCSV extends MatrixWriter
                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fname );
-
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
+               
                //if the file already exists on HDFS, remove it.
                MapReduceTool.deleteFileIfExistOnHDFS( fname );
                        
@@ -81,9 +81,9 @@ public class WriterTextCSV extends MatrixWriter
                throws IOException, DMLRuntimeException 
        {
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fname );
-
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
+               
                MatrixBlock src = new MatrixBlock((int)rlen, 1, true);
                writeCSVMatrixToHDFS(path, job, fs, src, _props);
 
@@ -245,7 +245,7 @@ public class WriterTextCSV extends MatrixWriter
 
                Path srcFilePath = new Path(srcFileName);
                Path destFilePath = new Path(destFileName);
-               FileSystem hdfs = FileSystem.get(conf);
+               FileSystem fs = IOUtilFunctions.getFileSystem(srcFilePath, 
conf);
                
                if ( !_props.hasHeader() ) {
                        // simply move srcFile to destFile
@@ -259,22 +259,17 @@ public class WriterTextCSV extends MatrixWriter
                         */
                        
                        // delete the destination file, if exists already
-                       //boolean ret1 = 
-                       hdfs.delete(destFilePath, true);
+                       fs.delete(destFilePath, true);
                        
                        // Create /user/biadmin/csv/temp/out/file.csv so that 
..../temp/out/ is created.
-                       //boolean ret2 = 
-                       hdfs.createNewFile(destFilePath);
+                       fs.createNewFile(destFilePath);
                        
                        // delete the file "file.csv" but preserve the 
directory structure /user/biadmin/csv/temp/out/
-                       //boolean ret3 = 
-                       hdfs.delete(destFilePath, true);
+                       fs.delete(destFilePath, true);
                        
                        // finally, move the data to destFilePath = 
/user/biadmin/csv/temp/out/file.csv
-                       //boolean ret4 = 
-                       hdfs.rename(srcFilePath, destFilePath);
+                       fs.rename(srcFilePath, destFilePath);
 
-                       //System.out.println("Return values = del:" + ret1 + ", 
createNew:" + ret2 + ", del:" + ret3 + ", rename:" + ret4);
                        return;
                }
        
@@ -287,11 +282,11 @@ public class WriterTextCSV extends MatrixWriter
                }
                sb.append('\n');
 
-               if (hdfs.isDirectory(srcFilePath)) {
+               if (fs.isDirectory(srcFilePath)) {
 
                        // compute sorted order among part files
                        ArrayList<Path> files=new ArrayList<Path>();
-                       for(FileStatus stat: hdfs.listStatus(srcFilePath, 
CSVReblockMR.hiddenFileFilter))
+                       for(FileStatus stat: fs.listStatus(srcFilePath, 
CSVReblockMR.hiddenFileFilter))
                                files.add(stat.getPath());
                        Collections.sort(files);
                
@@ -300,14 +295,14 @@ public class WriterTextCSV extends MatrixWriter
                        
                        // create a temp file, and add header and contents of 
first part
                        Path tmp = new Path(firstpart.toString() + ".tmp");
-                       OutputStream out = hdfs.create(tmp, true);
+                       OutputStream out = fs.create(tmp, true);
                        out.write(sb.toString().getBytes());
                        sb.setLength(0);
                        
                        // copy rest of the data from firstpart
                        InputStream in = null;
                        try {
-                               in = hdfs.open(firstpart);
+                               in = fs.open(firstpart);
                                IOUtils.copyBytes(in, out, conf, true);
                        } finally {
                                IOUtilFunctions.closeSilently(in);
@@ -315,18 +310,18 @@ public class WriterTextCSV extends MatrixWriter
                        }
                        
                        // rename tmp to firstpart
-                       hdfs.delete(firstpart, true);
-                       hdfs.rename(tmp, firstpart);
+                       fs.delete(firstpart, true);
+                       fs.rename(tmp, firstpart);
                        
                        // rename srcfile to destFile
-                       hdfs.delete(destFilePath, true);
-                       hdfs.createNewFile(destFilePath); // force the creation 
of directory structure
-                       hdfs.delete(destFilePath, true);  // delete the file, 
but preserve the directory structure
-                       hdfs.rename(srcFilePath, destFilePath); // move the 
data 
+                       fs.delete(destFilePath, true);
+                       fs.createNewFile(destFilePath); // force the creation 
of directory structure
+                       fs.delete(destFilePath, true);  // delete the file, but 
preserve the directory structure
+                       fs.rename(srcFilePath, destFilePath); // move the data 
                
-               } else if (hdfs.isFile(srcFilePath)) {
+               } else if (fs.isFile(srcFilePath)) {
                        // create destination file
-                       OutputStream out = hdfs.create(destFilePath, true);
+                       OutputStream out = fs.create(destFilePath, true);
                        
                        // write header
                        out.write(sb.toString().getBytes());
@@ -335,7 +330,7 @@ public class WriterTextCSV extends MatrixWriter
                        // copy the data from srcFile
                        InputStream in = null;
                        try {
-                               in = hdfs.open(srcFilePath);
+                               in = fs.open(srcFilePath);
                                IOUtils.copyBytes(in, out, conf, true);
                        } 
                        finally {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java
index eb82df1..581225f 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java
@@ -64,7 +64,7 @@ public class WriterTextCSVParallel extends WriterTextCSV
                }
                
                //create directory for concurrent tasks
-               MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
+               MapReduceTool.createDirIfNotExistOnHDFS(path, 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
                
                //create and execute tasks
                try 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java
index 9389bdc..432d363 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java
@@ -47,9 +47,9 @@ public class WriterTextCell extends MatrixWriter
                                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               FileSystem fs = FileSystem.get(job);
                Path path = new Path( fname );
-
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
+               
                //if the file already exists on HDFS, remove it.
                MapReduceTool.deleteFileIfExistOnHDFS( fname );
                        
@@ -64,16 +64,10 @@ public class WriterTextCell extends MatrixWriter
                throws IOException, DMLRuntimeException 
        {
                Path path = new Path( fname );
-               FileSystem fs = 
FileSystem.get(ConfigurationManager.getCachedJobConf());
-               
-               FSDataOutputStream writer = null;
-               try{
-                       writer = fs.create(path);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path);
+               try( FSDataOutputStream writer = fs.create(path) ){
                        writer.writeBytes("1 1 0");
                }
-               finally{
-                       IOUtilFunctions.closeSilently(writer);
-               }
                
                IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
        }
@@ -90,10 +84,8 @@ public class WriterTextCell extends MatrixWriter
        {
                boolean sparse = src.isInSparseFormat();
                int clen = src.getNumColumns();
-               
-               BufferedWriter br = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));           
 
-               try
+               try( BufferedWriter br = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true))) )
                {
                        //for obj reuse and preventing repeated buffer 
re-allocations
                        StringBuilder sb = new StringBuilder();
@@ -144,8 +136,5 @@ public class WriterTextCell extends MatrixWriter
                                br.write("1 1 0\n");
                        }
                }
-               finally {
-                       IOUtilFunctions.closeSilently(br);
-               }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java
index 208a9c8..1c08459 100644
--- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java
@@ -61,7 +61,7 @@ public class WriterTextCellParallel extends WriterTextCell
                }
                
                //create directory for concurrent tasks
-               MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
+               MapReduceTool.createDirIfNotExistOnHDFS(path, 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
 
                //create and execute tasks
                try 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/matrix/CleanupMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/CleanupMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/CleanupMR.java
index 3dee28d..118016a 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/CleanupMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/CleanupMR.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.lib.NLineInputFormat;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
-import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.conf.DMLConfig;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
@@ -110,22 +109,14 @@ public class CleanupMR
        private static void writeCleanupTasksToFile(Path path, int numTasks)
                throws DMLRuntimeException, IOException
        {
-               BufferedWriter br = null;
-               try
-               {
-                       FileSystem fs = 
FileSystem.get(ConfigurationManager.getCachedJobConf());
-                       br = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));
-               
+               FileSystem fs = IOUtilFunctions.getFileSystem(path);
+               try( BufferedWriter br = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true))) ) {
                        for( int i=1; i<=numTasks; i++ )
                                br.write( String.valueOf("CLEANUP TASK 
"+i)+"\n" );
                }
-               catch(Exception ex)
-               {
+               catch(Exception ex) {
                        throw new DMLRuntimeException("Error writing cleanup 
tasks to taskfile "+path.toString(), ex);
                }
-               finally {
-                       IOUtilFunctions.closeSilently(br);
-               }
        }
        
        public static class CleanupMapper implements Mapper<LongWritable, Text, 
Writable, Writable>

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
index 4bbc6a3..137b25e 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
@@ -139,17 +139,16 @@ public class SortMR
                return parts;
     }
 
-    public void configure(JobConf job) {
-      try {
-         FileSystem fs = FileSystem.get(job);
-          Path partFile = new 
Path(MRJobConfiguration.getSortPartitionFilename(job)); 
-          splitPoints = readPartitions(fs, partFile, job);
-        
-      } 
-      catch (IOException ie) {
-        throw new IllegalArgumentException("can't read paritions file", ie);
-      }
-    }
+       public void configure(JobConf job) {
+               try {
+                       Path partFile = new 
Path(MRJobConfiguration.getSortPartitionFilename(job));
+                       FileSystem fs = IOUtilFunctions.getFileSystem(partFile, 
job);
+                       splitPoints = readPartitions(fs, partFile, job);
+               }
+               catch (IOException ie) {
+                       throw new IllegalArgumentException("can't read 
paritions file", ie);
+               }
+       }
 
     public int getPartition(K key, V value, int numPartitions) {
       return findPartition(key)%numPartitions;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java
 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java
index 6166d2f..85b55cb 100644
--- 
a/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction;
+import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.CSVReblockMR;
 import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount;
 import org.apache.sysml.runtime.transform.TfUtils;
@@ -91,8 +92,9 @@ public class CSVAssignRowIDMapper extends MapReduceBase 
implements Mapper<LongWr
                        //it doesn't make sense to have repeated file names in 
the input, since this is for reblock
                        thisIndex = 
MRJobConfiguration.getInputMatrixIndexesInMapper(job).get(0);
                        outKey.set(thisIndex);
-                       FileSystem fs = FileSystem.get(job);
-                       Path thisPath = new 
Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)).makeQualified(fs);
+                       Path thisPath = new 
Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE));
+                       FileSystem fs = IOUtilFunctions.getFileSystem(thisPath, 
job);
+                       thisPath = thisPath.makeQualified(fs);
                        filename = thisPath.toString();
                        String[] strs = 
job.getStrings(CSVReblockMR.SMALLEST_FILE_NAME_PER_INPUT);
                        Path headerPath = new 
Path(strs[thisIndex]).makeQualified(fs);


Reply via email to