[SYSTEMML-1664] Fix handling of non-default DFS file URI schemes So far, SystemML's dfs file handling relied entirely on the configured default fs implementation. This created problems when reading directly from object stores (e.g., via swift, or s3) if the default hadoop configuration (w/ local file system as the default fs implementation) is on the class path. Similarly, it disallowed the access of local files for read/write if hdfs is the default.
This patch fixes these issues by properly instantiating the dfs file system handle, according to the given file URI schemes via new util primitives to create global consistency for all I/O paths. Furthermore, this patch also includes some minor cleanups such as the removal of unnecessary validity checks for trustworthy file names (which did not contain anymore rules given the generalization of file access policies). Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/557aae0f Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/557aae0f Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/557aae0f Branch: refs/heads/master Commit: 557aae0ff3e069295da39bb6e280f435e9514cd9 Parents: 88f4a46 Author: Matthias Boehm <[email protected]> Authored: Sun Jun 4 00:47:53 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Sun Jun 4 14:01:18 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/sysml/api/DMLScript.java | 14 +- .../org/apache/sysml/api/jmlc/Connection.java | 2 +- .../sysml/api/mlcontext/ScriptFactory.java | 19 +-- .../java/org/apache/sysml/conf/DMLConfig.java | 10 +- .../apache/sysml/hops/recompile/Recompiler.java | 5 +- .../org/apache/sysml/parser/DataExpression.java | 39 ++---- .../org/apache/sysml/parser/ParserWrapper.java | 8 +- .../controlprogram/ParForProgramBlock.java | 4 +- .../parfor/DataPartitionerLocal.java | 12 +- .../parfor/DataPartitionerRemoteReducer.java | 5 +- .../DataPartitionerRemoteSparkReducer.java | 2 +- .../controlprogram/parfor/RemoteDPParForMR.java | 5 +- .../parfor/RemoteParForColocatedFileSplit.java | 18 +-- .../controlprogram/parfor/RemoteParForMR.java | 4 +- .../parfor/ResultMergeLocalFile.java | 18 +-- .../parfor/ResultMergeRemoteMR.java | 7 +- .../instructions/cp/VariableCPInstruction.java | 5 +- .../ParameterizedBuiltinCPFileInstruction.java | 12 +- .../instructions/spark/RandSPInstruction.java | 20 ++- .../apache/sysml/runtime/io/FrameReader.java | 2 +- .../runtime/io/FrameReaderBinaryBlock.java | 4 +- .../sysml/runtime/io/FrameReaderTextCSV.java | 2 +- .../sysml/runtime/io/FrameReaderTextCell.java | 2 +- .../runtime/io/FrameWriterBinaryBlock.java | 2 +- .../io/FrameWriterBinaryBlockParallel.java | 4 +- .../sysml/runtime/io/FrameWriterTextCSV.java | 4 +- .../runtime/io/FrameWriterTextCSVParallel.java | 4 +- .../sysml/runtime/io/FrameWriterTextCell.java | 2 +- .../runtime/io/FrameWriterTextCellParallel.java | 4 +- .../sysml/runtime/io/IOUtilFunctions.java | 25 ++++ .../apache/sysml/runtime/io/MatrixReader.java | 2 +- .../sysml/runtime/io/ReaderBinaryBlock.java | 4 +- .../runtime/io/ReaderBinaryBlockParallel.java | 2 +- .../sysml/runtime/io/ReaderBinaryCell.java | 2 +- .../apache/sysml/runtime/io/ReaderTextCSV.java | 2 +- .../sysml/runtime/io/ReaderTextCSVParallel.java | 4 +- .../apache/sysml/runtime/io/ReaderTextCell.java | 2 +- .../runtime/io/ReaderTextCellParallel.java | 2 +- .../sysml/runtime/io/WriterBinaryBlock.java | 10 +- .../runtime/io/WriterBinaryBlockParallel.java | 2 +- .../sysml/runtime/io/WriterBinaryCell.java | 8 +- .../sysml/runtime/io/WriterMatrixMarket.java | 26 ++-- .../runtime/io/WriterMatrixMarketParallel.java | 2 +- .../apache/sysml/runtime/io/WriterTextCSV.java | 49 +++---- .../sysml/runtime/io/WriterTextCSVParallel.java | 2 +- .../apache/sysml/runtime/io/WriterTextCell.java | 21 +-- .../runtime/io/WriterTextCellParallel.java | 2 +- .../apache/sysml/runtime/matrix/CleanupMR.java | 15 +- .../org/apache/sysml/runtime/matrix/SortMR.java | 21 ++- .../matrix/mapred/CSVAssignRowIDMapper.java | 6 +- .../runtime/matrix/mapred/CSVReblockMapper.java | 5 +- .../matrix/mapred/MRJobConfiguration.java | 8 +- .../runtime/matrix/sort/CompactInputFormat.java | 4 +- .../matrix/sort/PickFromCompactInputFormat.java | 8 +- .../runtime/transform/ApplyTfBBMapper.java | 6 +- .../sysml/runtime/transform/ApplyTfCSVMR.java | 3 +- .../runtime/transform/ApplyTfCSVMapper.java | 3 +- .../sysml/runtime/transform/DataTransform.java | 7 +- .../sysml/runtime/transform/GenTfMtdMR.java | 3 +- .../apache/sysml/runtime/transform/TfUtils.java | 27 ++-- .../sysml/runtime/util/LocalFileUtils.java | 31 ----- .../sysml/runtime/util/MapReduceTool.java | 138 +++++++++++-------- .../apache/sysml/udf/lib/RemoveEmptyRows.java | 3 +- .../org/apache/sysml/yarn/DMLAppMaster.java | 3 +- .../org/apache/sysml/yarn/DMLYarnClient.java | 15 +- .../functions/transform/ScalingTest.java | 5 +- .../org/apache/sysml/test/utils/TestUtils.java | 68 ++++----- 67 files changed, 357 insertions(+), 428 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/api/DMLScript.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/DMLScript.java b/src/main/java/org/apache/sysml/api/DMLScript.java index 5d1f7eb..c927250 100644 --- a/src/main/java/org/apache/sysml/api/DMLScript.java +++ b/src/main/java/org/apache/sysml/api/DMLScript.java @@ -639,17 +639,13 @@ public class DMLScript if( fileName.startsWith("hdfs:") || fileName.startsWith("gpfs:") ) { - if( !LocalFileUtils.validateExternalFilename(fileName, true) ) - throw new LanguageException("Invalid (non-trustworthy) hdfs filename."); - FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); Path scriptPath = new Path(fileName); + FileSystem fs = IOUtilFunctions.getFileSystem(scriptPath); in = new BufferedReader(new InputStreamReader(fs.open(scriptPath))); } // from local file system else { - if( !LocalFileUtils.validateExternalFilename(fileName, false) ) - throw new LanguageException("Invalid (non-trustworthy) local filename."); in = new BufferedReader(new FileReader(fileName)); } @@ -969,14 +965,6 @@ public class DMLScript { LOG.warn("Cannot run map/reduce tasks as user '"+userName+"'. Using tasktracker group '"+ttGroupName+"'."); } - - //validate external filenames working directories - String localtmpdir = config.getTextValue(DMLConfig.LOCAL_TMP_DIR); - String hdfstmpdir = config.getTextValue(DMLConfig.SCRATCH_SPACE); - if( !LocalFileUtils.validateExternalFilename(localtmpdir, false) ) - throw new DMLRuntimeException("Invalid (non-trustworthy) local working directory."); - if( !LocalFileUtils.validateExternalFilename(hdfstmpdir, true) ) - throw new DMLRuntimeException("Invalid (non-trustworthy) hdfs working directory."); } public static void cleanupHadoopExecution( DMLConfig config ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/api/jmlc/Connection.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/jmlc/Connection.java b/src/main/java/org/apache/sysml/api/jmlc/Connection.java index dc4ee64..6f25c29 100644 --- a/src/main/java/org/apache/sysml/api/jmlc/Connection.java +++ b/src/main/java/org/apache/sysml/api/jmlc/Connection.java @@ -232,8 +232,8 @@ public class Connection implements Closeable if( fname.startsWith("hdfs:") || fname.startsWith("gpfs:") ) { - FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); Path scriptPath = new Path(fname); + FileSystem fs = IOUtilFunctions.getFileSystem(scriptPath); in = new BufferedReader(new InputStreamReader(fs.open(scriptPath))); } // from local file system http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/api/mlcontext/ScriptFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/mlcontext/ScriptFactory.java b/src/main/java/org/apache/sysml/api/mlcontext/ScriptFactory.java index a01dde2..4a30f00 100644 --- a/src/main/java/org/apache/sysml/api/mlcontext/ScriptFactory.java +++ b/src/main/java/org/apache/sysml/api/mlcontext/ScriptFactory.java @@ -30,8 +30,7 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.sysml.conf.ConfigurationManager; -import org.apache.sysml.runtime.util.LocalFileUtils; +import org.apache.sysml.runtime.io.IOUtilFunctions; /** * Factory for creating DML and PYDML Script objects from strings, files, URLs, @@ -331,13 +330,7 @@ public class ScriptFactory { } String filePath = file.getPath(); try { - if (!LocalFileUtils.validateExternalFilename(filePath, false)) { - throw new MLContextException("Invalid (non-trustworthy) local filename: " + filePath); - } - String scriptString = FileUtils.readFileToString(file); - return scriptString; - } catch (IllegalArgumentException e) { - throw new MLContextException("Error trying to read script string from file: " + filePath, e); + return FileUtils.readFileToString(file); } catch (IOException e) { throw new MLContextException("Error trying to read script string from file: " + filePath, e); } @@ -359,17 +352,11 @@ public class ScriptFactory { } try { if (scriptFilePath.startsWith("hdfs:") || scriptFilePath.startsWith("gpfs:")) { - if (!LocalFileUtils.validateExternalFilename(scriptFilePath, true)) { - throw new MLContextException("Invalid (non-trustworthy) hdfs/gpfs filename: " + scriptFilePath); - } - FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); Path path = new Path(scriptFilePath); + FileSystem fs = IOUtilFunctions.getFileSystem(path); FSDataInputStream fsdis = fs.open(path); return IOUtils.toString(fsdis); } else {// from local file system - if (!LocalFileUtils.validateExternalFilename(scriptFilePath, false)) { - throw new MLContextException("Invalid (non-trustworthy) local filename: " + scriptFilePath); - } File scriptFile = new File(scriptFilePath); return FileUtils.readFileToString(scriptFile); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/conf/DMLConfig.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/conf/DMLConfig.java b/src/main/java/org/apache/sysml/conf/DMLConfig.java index ccd8889..3a21d95 100644 --- a/src/main/java/org/apache/sysml/conf/DMLConfig.java +++ b/src/main/java/org/apache/sysml/conf/DMLConfig.java @@ -42,7 +42,7 @@ import org.apache.hadoop.fs.Path; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.parser.ParseException; import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.util.LocalFileUtils; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.NodeList; @@ -175,16 +175,12 @@ public class DMLConfig if (_fileName.startsWith("hdfs:") || _fileName.startsWith("gpfs:") ) // config file from DFS { - if( !LocalFileUtils.validateExternalFilename(_fileName, true) ) - throw new IOException("Invalid (non-trustworthy) hdfs config filename."); - FileSystem DFS = FileSystem.get(ConfigurationManager.getCachedJobConf()); - Path configFilePath = new Path(_fileName); + Path configFilePath = new Path(_fileName); + FileSystem DFS = IOUtilFunctions.getFileSystem(configFilePath); domTree = builder.parse(DFS.open(configFilePath)); } else // config from local file system { - if( !LocalFileUtils.validateExternalFilename(_fileName, false) ) - throw new IOException("Invalid (non-trustworthy) local config filename."); domTree = builder.parse(_fileName); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java index 6c8ddea..9942035 100644 --- a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java +++ b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java @@ -30,7 +30,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; import org.apache.wink.json4j.JSONObject; import org.apache.sysml.api.DMLScript; import org.apache.sysml.conf.ConfigurationManager; @@ -1965,10 +1964,8 @@ public class Recompiler { //get meta data filename String mtdname = DataExpression.getMTDFileName(dop.getFileName()); - - JobConf job = ConfigurationManager.getCachedJobConf(); - FileSystem fs = FileSystem.get(job); Path path = new Path(mtdname); + FileSystem fs = IOUtilFunctions.getFileSystem(mtdname); if( fs.exists(path) ){ BufferedReader br = null; try http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/parser/DataExpression.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/parser/DataExpression.java b/src/main/java/org/apache/sysml/parser/DataExpression.java index 51a51c9..47f85f4 100644 --- a/src/main/java/org/apache/sysml/parser/DataExpression.java +++ b/src/main/java/org/apache/sysml/parser/DataExpression.java @@ -37,7 +37,6 @@ import org.apache.sysml.parser.LanguageException.LanguageErrorCodes; import org.apache.sysml.parser.common.CustomErrorListener; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.io.IOUtilFunctions; -import org.apache.sysml.runtime.util.LocalFileUtils; import org.apache.sysml.runtime.util.MapReduceTool; import org.apache.sysml.runtime.util.UtilFunctions; import org.apache.sysml.utils.JSONHelper; @@ -540,13 +539,7 @@ public class DataExpression extends DataIdentifier } public static String getMTDFileName(String inputFileName) throws LanguageException { - String mtdName = inputFileName + ".mtd"; - - //validate read filename - if( !LocalFileUtils.validateExternalFilename(mtdName, true) ) - throw new LanguageException("Invalid (non-trustworthy) hdfs read filename."); - - return mtdName; + return inputFileName + ".mtd"; } /** @@ -1065,11 +1058,6 @@ public class DataExpression extends DataIdentifier }*/ //validate read filename - String fnameWrite = getVarParam(IO_FILENAME).toString(); - if( !LocalFileUtils.validateExternalFilename(fnameWrite, true) ) //always unconditional - raiseValidateError("Invalid (non-trustworthy) hdfs write filename.", false); - - if (getVarParam(FORMAT_TYPE) == null || getVarParam(FORMAT_TYPE).toString().equalsIgnoreCase("text")) getOutput().setBlockDimensions(-1, -1); else if (getVarParam(FORMAT_TYPE).toString().equalsIgnoreCase("binary")) @@ -1923,7 +1911,7 @@ public class DataExpression extends DataIdentifier continue; BufferedReader br = null; try { - FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); + FileSystem fs = IOUtilFunctions.getFileSystem(childPath); br = new BufferedReader(new InputStreamReader(fs.open(childPath))); JSONObject childObj = JSONHelper.parse(br); for( Object obj : childObj.entrySet() ){ @@ -1947,8 +1935,9 @@ public class DataExpression extends DataIdentifier { BufferedReader br = null; try { - FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); - br = new BufferedReader(new InputStreamReader(fs.open(new Path(filename)))); + Path path = new Path(filename); + FileSystem fs = IOUtilFunctions.getFileSystem(path); + br = new BufferedReader(new InputStreamReader(fs.open(path))); retVal = new JSONObject(br); } catch (Exception e){ @@ -1972,19 +1961,16 @@ public class DataExpression extends DataIdentifier try { - FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); - Path pt = new Path(filename); - if (fs.exists(pt)){ - exists = true; - } - - boolean getFileStatusIsDir = fs.getFileStatus(pt).isDirectory(); + Path path = new Path(filename); + FileSystem fs = IOUtilFunctions.getFileSystem(path); + exists = fs.exists(path); + boolean getFileStatusIsDir = fs.getFileStatus(path).isDirectory(); if (exists && getFileStatusIsDir){ raiseValidateError("MatrixMarket files as directories not supported", conditional); } else if (exists) { - BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(pt))); + BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(path))); try { retVal[0] = in.readLine(); @@ -2027,8 +2013,9 @@ public class DataExpression extends DataIdentifier { BufferedReader in = null; try { - FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); - in = new BufferedReader(new InputStreamReader(fs.open(new Path(inputFileName)))); + Path path = new Path(inputFileName); + FileSystem fs = IOUtilFunctions.getFileSystem(path); + in = new BufferedReader(new InputStreamReader(fs.open(path))); String headerLine = new String(""); if (in.ready()) headerLine = in.readLine(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/parser/ParserWrapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/parser/ParserWrapper.java b/src/main/java/org/apache/sysml/parser/ParserWrapper.java index 2711b6a..9a32b00 100644 --- a/src/main/java/org/apache/sysml/parser/ParserWrapper.java +++ b/src/main/java/org/apache/sysml/parser/ParserWrapper.java @@ -33,10 +33,8 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.parser.common.CustomErrorListener.ParseIssue; import org.apache.sysml.runtime.io.IOUtilFunctions; -import org.apache.sysml.runtime.util.LocalFileUtils; /** * Base class for all dml parsers in order to make the various compilation chains @@ -100,18 +98,14 @@ public abstract class ParserWrapper { || script.startsWith("gpfs:") ) { LOG.debug("Looking for the following file in HDFS or GPFS: " + script); - if( !LocalFileUtils.validateExternalFilename(script, true) ) - throw new LanguageException("Invalid (non-trustworthy) hdfs filename."); - FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); Path scriptPath = new Path(script); + FileSystem fs = IOUtilFunctions.getFileSystem(scriptPath); in = new BufferedReader(new InputStreamReader(fs.open(scriptPath))); } // from local file system else { LOG.debug("Looking for the following file in the local file system: " + script); - if( !LocalFileUtils.validateExternalFilename(script, false) ) - throw new LanguageException("Invalid (non-trustworthy) local filename."); if (Files.exists(Paths.get(script))) in = new BufferedReader(new FileReader(script)); else // check in scripts/ directory for file (useful for tests) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java index f3de422..c9dcc22 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java @@ -1633,7 +1633,7 @@ public class ParForProgramBlock extends ForProgramBlock try { Path path = new Path(fname); - FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); + FileSystem fs = IOUtilFunctions.getFileSystem(path); br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); boolean flagFirst = true; //workaround for keeping gen order @@ -1662,7 +1662,7 @@ public class ParForProgramBlock extends ForProgramBlock try { Path path = new Path( fname ); - FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); + FileSystem fs = IOUtilFunctions.getFileSystem(path); br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); Task t = null; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java index eb273b6..f0b5674 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java @@ -239,7 +239,7 @@ public class DataPartitionerLocal extends DataPartitioner //check and add input path JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); Path path = new Path(fname); - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); //prepare sequence file reader, and write to local staging area LinkedList<Cell> buffer = new LinkedList<Cell>(); @@ -327,7 +327,7 @@ public class DataPartitionerLocal extends DataPartitioner //check and add input path JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); Path path = new Path(fname); - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); //prepare sequence file reader, and write to local staging area MatrixIndexes key = new MatrixIndexes(); @@ -400,7 +400,7 @@ public class DataPartitionerLocal extends DataPartitioner //check and add input path JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); Path path = new Path(fname); - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); //prepare sequence file reader, and write to local staging area MatrixIndexes key = new MatrixIndexes(); @@ -601,8 +601,8 @@ public class DataPartitionerLocal extends DataPartitioner throws IOException { long key = getKeyFromFilePath(lpdir); - FileSystem fs = FileSystem.get(job); Path path = new Path(dir+"/"+key); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class); //beware ca 50ms try @@ -637,8 +637,8 @@ public class DataPartitionerLocal extends DataPartitioner throws IOException { long key = getKeyFromFilePath(lpdir); - FileSystem fs = FileSystem.get(job); Path path = new Path(dir+"/"+key); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixCell.class); //beware ca 50ms try @@ -667,8 +667,8 @@ public class DataPartitionerLocal extends DataPartitioner throws IOException { long key = getKeyFromFilePath(lpdir); - FileSystem fs = FileSystem.get(job); Path path = new Path(dir+"/"+key); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); try { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteReducer.java index d493e90..21d5a64 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteReducer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteReducer.java @@ -104,8 +104,9 @@ public class DataPartitionerRemoteReducer _fnameNew = fnameNew; try { - _fs = FileSystem.get(_job); - } catch (IOException e) { + _fs = IOUtilFunctions.getFileSystem(new Path(fnameNew), job); + } + catch (IOException e) { throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java index 174c544..ad228eb 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java @@ -66,8 +66,8 @@ public class DataPartitionerRemoteSparkReducer implements VoidFunction<Tuple2<Lo { //create sequence file writer Configuration job = new Configuration(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path(_fnameNew + File.separator + key); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class, job.getInt(MRConfigurationNames.IO_FILE_BUFFER_SIZE, 4096), (short)_replication, fs.getDefaultBlockSize(), null, new SequenceFile.Metadata()); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java index a8fda17..60d23c6 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java @@ -247,20 +247,19 @@ public class RemoteDPParForMR { HashMap<Long,LocalVariableMap> tmp = new HashMap<Long,LocalVariableMap>(); - FileSystem fs = FileSystem.get(job); Path path = new Path(fname); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); LongWritable key = new LongWritable(); //workerID Text value = new Text(); //serialized var header (incl filename) int countAll = 0; for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) ) { - SequenceFile.Reader reader = new SequenceFile.Reader(FileSystem.get(job),lpath,job); + SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job); try { while( reader.next(key, value) ) { - //System.out.println("key="+key.get()+", value="+value.toString()); if( !tmp.containsKey( key.get() ) ) tmp.put(key.get(), new LocalVariableMap ()); Object[] dat = ProgramConverter.parseDataObject( value.toString() ); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedFileSplit.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedFileSplit.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedFileSplit.java index e4895cb..ade08e6 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedFileSplit.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedFileSplit.java @@ -81,7 +81,7 @@ public class RemoteParForColocatedFileSplit extends FileSplit //time.start(); JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(getPath(), job); //read task string LongWritable key = new LongWritable(); @@ -126,23 +126,7 @@ public class RemoteParForColocatedFileSplit extends FileSplit for( BlockLocation bl : tmp1 ) countHosts(hosts, bl.getHosts()); } - - /* - int lFrom = t.getIterations().get(0).getIntValue(); - int lTo = t.getIterations().get(1).getIntValue(); - int lIncr = t.getIterations().get(2).getIntValue(); - for( int i=lFrom; i<=lTo; i+=lIncr ) - { - String fname = _fname+"/"+String.valueOf( ((i-_offset)/_blen+_offset) ); - FileSystem fs = FileSystem.get(job); - FileStatus status = fs.getFileStatus(new Path(fname)); - BlockLocation[] tmp1 = fs.getFileBlockLocations(status, 0, status.getLen()); - for( BlockLocation bl : tmp1 ) - countHosts(hosts, bl.getHosts()); - }*/ } - - //System.out.println("Get locations "+time.stop()+""); //majority consensus on top host return getTopHosts(hosts); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java index 465136d..67c3368 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java @@ -258,15 +258,15 @@ public class RemoteParForMR { HashMap<Long,LocalVariableMap> tmp = new HashMap<Long,LocalVariableMap>(); - FileSystem fs = FileSystem.get(job); Path path = new Path(fname); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); LongWritable key = new LongWritable(); //workerID Text value = new Text(); //serialized var header (incl filename) int countAll = 0; for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) ) { - SequenceFile.Reader reader = new SequenceFile.Reader(FileSystem.get(job),lpath,job); + SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job); try { while( reader.next(key, value) ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java index 8e201d0..7dae670 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java @@ -222,8 +222,8 @@ public class ResultMergeLocalFile extends ResultMerge //actual merge JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path( fnameNew ); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); String valueStr = null; @@ -324,8 +324,8 @@ public class ResultMergeLocalFile extends ResultMerge //actual merge JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path( fnameNew ); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); SequenceFile.Writer out = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixCell.class); //beware ca 50ms MatrixIndexes key = new MatrixIndexes(); @@ -474,8 +474,8 @@ public class ResultMergeLocalFile extends ResultMerge MatrixBlock value = new MatrixBlock(); JobConf tmpJob = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(tmpJob); Path tmpPath = new Path(mo.getFileName()); + FileSystem fs = IOUtilFunctions.getFileSystem(tmpPath, tmpJob); for(Path lpath : MatrixReader.getSequenceFilePaths(fs, tmpPath)) { @@ -566,7 +566,7 @@ public class ResultMergeLocalFile extends ResultMerge { JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); Path path = new Path(mo.getFileName()); - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); LinkedList<Cell> buffer = new LinkedList<Cell>(); MatrixIndexes key = new MatrixIndexes(); @@ -649,8 +649,8 @@ public class ResultMergeLocalFile extends ResultMerge throws IOException, DMLRuntimeException { JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path( fnameNew ); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); MatrixCharacteristics mc = metadata.getMatrixCharacteristics(); long rlen = mc.getRows(); @@ -745,8 +745,8 @@ public class ResultMergeLocalFile extends ResultMerge throws IOException, DMLRuntimeException { JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path( fnameNew ); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); MatrixCharacteristics mc = metadata.getMatrixCharacteristics(); long rlen = mc.getRows(); @@ -882,8 +882,8 @@ public class ResultMergeLocalFile extends ResultMerge throws IOException, DMLRuntimeException { JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path( fnameNew ); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); MatrixCharacteristics mc = metadata.getMatrixCharacteristics(); long rlen = mc.getRows(); @@ -1009,9 +1009,9 @@ public class ResultMergeLocalFile extends ResultMerge throws CacheException, IOException { JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path( fnameNew ); - + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); + //create output dir fs.mkdirs(path); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java index 98a063e..e2377c4 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java @@ -39,6 +39,7 @@ import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.parfor.util.StagingFileUtils; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.MatrixFormatMetaData; import org.apache.sysml.runtime.matrix.data.InputInfo; @@ -194,8 +195,10 @@ public class ResultMergeRemoteMR extends ResultMerge ///// //configure the MR job if( withCompare ) { - pathCompare = new Path(fname).makeQualified(FileSystem.get(job)); - MRJobConfiguration.setResultMergeInfo(job, pathCompare.toString(), ii, LocalFileUtils.getWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE), rlen, clen, brlen, bclen); + FileSystem fs = IOUtilFunctions.getFileSystem(pathNew, job); + pathCompare = new Path(fname).makeQualified(fs); + MRJobConfiguration.setResultMergeInfo(job, pathCompare.toString(), ii, + LocalFileUtils.getWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE), rlen, clen, brlen, bclen); } else MRJobConfiguration.setResultMergeInfo(job, "null", ii, LocalFileUtils.getWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE), rlen, clen, bclen, bclen); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java index 907c102..4976f26 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/VariableCPInstruction.java @@ -25,9 +25,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; import org.apache.sysml.api.DMLScript; -import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.lops.Lop; import org.apache.sysml.lops.UnaryCP; import org.apache.sysml.parser.Expression.DataType; @@ -899,8 +897,7 @@ public class VariableCPInstruction extends CPInstruction MapReduceTool.writeObjectToHDFS(scalar.getValue(), fname); MapReduceTool.writeScalarMetaDataFile(fname +".mtd", input1.getValueType()); - JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(fname); if (fs instanceof LocalFileSystem) { Path path = new Path(fname); IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java index 715bf4f..6a7127f 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java @@ -267,7 +267,7 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC //prepare input JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); Path path = new Path(fnameOld); - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); if( !fs.exists(path) ) throw new IOException("File "+fnameOld+" does not exist on HDFS."); FileInputFormat.addInputPath(job, path); @@ -321,7 +321,7 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC //prepare input JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); Path path = new Path(fnameOld); - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); if( !fs.exists(path) ) throw new IOException("File "+fnameOld+" does not exist on HDFS."); @@ -379,7 +379,7 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC //prepare input JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); Path path = new Path(fnameOld); - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); if( !fs.exists(path) ) throw new IOException("File "+fnameOld+" does not exist on HDFS."); @@ -631,7 +631,7 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC //prepare input JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); Path path = new Path(fnameNew); - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); String metaOut = stagingDir+"/meta"; //prepare output @@ -732,7 +732,7 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC //prepare input JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); Path path = new Path(fnameNew); - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); String metaOut = stagingDir+"/meta"; //prepare output @@ -883,7 +883,7 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC //prepare input JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); Path path = new Path(fnameNew); - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); String metaOut = stagingDir+"/meta"; //prepare output http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java index b2f3503..b075076 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java @@ -43,7 +43,6 @@ import scala.Tuple2; import org.apache.sysml.api.DMLScript; import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; -import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.hops.DataGenOp; import org.apache.sysml.hops.Hop.DataGenMethod; import org.apache.sysml.hops.OptimizerUtils; @@ -385,12 +384,12 @@ public class RandSPInstruction extends UnarySPInstruction //b) file-based seed rdd construction (for robustness wrt large number of blocks) else { - String path = LibMatrixDatagen.generateUniqueSeedPath(dir); + Path path = new Path(LibMatrixDatagen.generateUniqueSeedPath(dir)); PrintWriter pw = null; try { - FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); - pw = new PrintWriter(fs.create(new Path(path))); + FileSystem fs = IOUtilFunctions.getFileSystem(path); + pw = new PrintWriter(fs.create(path)); StringBuilder sb = new StringBuilder(); for( long i=0; i<numBlocks; i++ ) { sb.append(1 + i/numColBlocks); @@ -416,7 +415,7 @@ public class RandSPInstruction extends UnarySPInstruction //create seeds rdd seedsRDD = sec.getSparkContext() - .textFile(path, numPartitions) + .textFile(path.toString(), numPartitions) .mapToPair(new ExtractSeedTuple()); } @@ -476,13 +475,12 @@ public class RandSPInstruction extends UnarySPInstruction //b) file-based offset rdd construction (for robustness wrt large number of blocks) else { - String path = LibMatrixDatagen.generateUniqueSeedPath(dir); + Path path = new Path(LibMatrixDatagen.generateUniqueSeedPath(dir)); PrintWriter pw = null; - try - { - FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); - pw = new PrintWriter(fs.create(new Path(path))); + try { + FileSystem fs = IOUtilFunctions.getFileSystem(path); + pw = new PrintWriter(fs.create(path)); for( long i=0; i<numBlocks; i++ ) { double off = seq_from + seq_incr*i*rowsInBlock; pw.println(off); @@ -500,7 +498,7 @@ public class RandSPInstruction extends UnarySPInstruction //create seeds rdd offsetsRDD = sec.getSparkContext() - .textFile(path, numPartitions) + .textFile(path.toString(), numPartitions) .map(new ExtractOffsetTuple()); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/FrameReader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java index 321735d..03271d8 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java @@ -153,7 +153,7 @@ public abstract class FrameReader throw new IOException("File "+path.toString()+" does not exist on HDFS/LFS."); //check for empty file - if( MapReduceTool.isFileEmpty( fs, path.toString() ) ) + if( MapReduceTool.isFileEmpty(fs, path) ) throw new EOFException("Empty input file "+ path.toString() +"."); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java index 32feea3..e08aa96 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java @@ -50,8 +50,8 @@ public class FrameReaderBinaryBlock extends FrameReader //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path( fname ); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); //check existence and non-empty file checkValidInputFile(fs, path); @@ -128,8 +128,8 @@ public class FrameReaderBinaryBlock extends FrameReader { //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path( fname ); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); LongWritable key = new LongWritable(); FrameBlock value = new FrameBlock(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java index e86cbe2..9e10f2c 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java @@ -62,8 +62,8 @@ public class FrameReaderTextCSV extends FrameReader { //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path( fname ); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); FileInputFormat.addInputPath(job, path); //check existence and non-empty file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java index 548452f..55128f8 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java @@ -58,8 +58,8 @@ public class FrameReaderTextCell extends FrameReader //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path( fname ); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); //check existence and non-empty file checkValidInputFile(fs, path); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java index 563aeb0..819b7d0 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java @@ -63,7 +63,7 @@ public class FrameWriterBinaryBlock extends FrameWriter protected void writeBinaryBlockFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen ) throws IOException, DMLRuntimeException { - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(path); int blen = ConfigurationManager.getBlocksize(); //sequential write to single file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java index 4a8c748..a25fe75 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java @@ -65,8 +65,8 @@ public class FrameWriterBinaryBlockParallel extends FrameWriterBinaryBlock } //create directory for concurrent tasks - MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); - FileSystem fs = FileSystem.get(job); + MapReduceTool.createDirIfNotExistOnHDFS(path, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); + FileSystem fs = IOUtilFunctions.getFileSystem(path); //create and execute write tasks try http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java index 24bdfce..83f3861 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java @@ -74,8 +74,8 @@ public class FrameWriterTextCSV extends FrameWriter protected void writeCSVFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen, CSVFileFormatProperties csvprops ) throws IOException { - FileSystem fs = FileSystem.get(job); - + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); + //sequential write to single text file writeCSVFrameToFile(path, job, fs, src, 0, (int)rlen, csvprops); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java index 9a3a33d..10f0827 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java @@ -67,8 +67,8 @@ public class FrameWriterTextCSVParallel extends FrameWriterTextCSV } //create directory for concurrent tasks - MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); - FileSystem fs = FileSystem.get(job); + MapReduceTool.createDirIfNotExistOnHDFS(path, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); //create and execute tasks try http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java index 0768d4a..7263e7a 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java @@ -63,7 +63,7 @@ public class FrameWriterTextCell extends FrameWriter protected void writeTextCellFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen ) throws IOException { - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); //sequential write to single text file writeTextCellFrameToFile(path, job, fs, src, 0, (int)rlen); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java index 8a32952..8eed53c 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java @@ -63,8 +63,8 @@ public class FrameWriterTextCellParallel extends FrameWriterTextCell } //create directory for concurrent tasks - MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); - FileSystem fs = FileSystem.get(job); + MapReduceTool.createDirIfNotExistOnHDFS(path, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); //create and execute tasks try http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java index df8174f..44d9aee 100644 --- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java @@ -32,6 +32,7 @@ import org.apache.commons.io.input.ReaderInputStream; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; @@ -43,6 +44,7 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.runtime.transform.TfUtils; import org.apache.sysml.runtime.util.LocalFileUtils; import org.apache.sysml.runtime.util.UtilFunctions; @@ -53,6 +55,29 @@ public class IOUtilFunctions private static final char CSV_QUOTE_CHAR = '"'; + public static FileSystem getFileSystem(String fname) throws IOException { + return getFileSystem(new Path(fname), + ConfigurationManager.getCachedJobConf()); + } + + public static FileSystem getFileSystem(Path fname) throws IOException { + return getFileSystem(fname, + ConfigurationManager.getCachedJobConf()); + } + + public static FileSystem getFileSystem(Path fname, Configuration conf) throws IOException { + return FileSystem.get(fname.toUri(), conf); + } + + public static boolean isSameFileScheme(Path path1, Path path2) { + if( path1 == null || path2 == null || path1.toUri() == null || path2.toUri() == null) + return false; + String scheme1 = path1.toUri().getScheme(); + String scheme2 = path2.toUri().getScheme(); + return (scheme1 == null && scheme2 == null) + || scheme1.equals(scheme2); + } + public static void closeSilently( Closeable io ) { try { if( io != null ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java index 8e624a6..dbd1602 100644 --- a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java +++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java @@ -134,7 +134,7 @@ public abstract class MatrixReader throw new IOException("File "+path.toString()+" does not exist on HDFS/LFS."); //check for empty file - if( MapReduceTool.isFileEmpty( fs, path.toString() ) ) + if( MapReduceTool.isFileEmpty(fs, path) ) throw new EOFException("Empty input file "+ path.toString() +"."); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java index 4c8549e..16241c9 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java @@ -58,8 +58,8 @@ public class ReaderBinaryBlock extends MatrixReader //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = _localFS ? FileSystem.getLocal(job) : FileSystem.get(job); Path path = new Path( (_localFS ? "file:///" : "") + fname); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); //check existence and non-empty file checkValidInputFile(fs, path); @@ -90,8 +90,8 @@ public class ReaderBinaryBlock extends MatrixReader //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = _localFS ? FileSystem.getLocal(job) : FileSystem.get(job); Path path = new Path( (_localFS ? "file:///" : "") + fname); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); //check existence and non-empty file checkValidInputFile(fs, path); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java index 387eac8..2afdb0e 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java @@ -60,8 +60,8 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = _localFS ? FileSystem.getLocal(job) : FileSystem.get(job); Path path = new Path( (_localFS ? "file:///" : "") + fname); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); //check existence and non-empty file checkValidInputFile(fs, path); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java index dcf9e7b..2bdc25e 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java @@ -44,8 +44,8 @@ public class ReaderBinaryCell extends MatrixReader //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path( fname ); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); //check existence and non-empty file checkValidInputFile(fs, path); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java index 6256955..27637ab 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java @@ -60,8 +60,8 @@ public class ReaderTextCSV extends MatrixReader //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path( fname ); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); //check existence and non-empty file checkValidInputFile(fs, path); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java index 75b3bd9..88388f4 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java @@ -74,9 +74,9 @@ public class ReaderTextCSVParallel extends MatrixReader { // prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path(fname); - + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); + FileInputFormat.addInputPath(job, path); TextInputFormat informat = new TextInputFormat(); informat.configure(job); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java index 3b93c33..fd3af44 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java @@ -59,8 +59,8 @@ public class ReaderTextCell extends MatrixReader //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path( fname ); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); //check existence and non-empty file checkValidInputFile(fs, path); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java index 9501b6d..5e0e043 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java @@ -83,8 +83,8 @@ public class ReaderTextCellParallel extends MatrixReader { //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path( fname ); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); //check existence and non-empty file checkValidInputFile(fs, path); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java index cdc49fb..78ffc0b 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java @@ -50,9 +50,9 @@ public class WriterBinaryBlock extends MatrixWriter { //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path( fname ); - + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); + //if the file already exists on HDFS, remove it. MapReduceTool.deleteFileIfExistOnHDFS( fname ); @@ -76,8 +76,8 @@ public class WriterBinaryBlock extends MatrixWriter { JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); Path path = new Path( fname ); - FileSystem fs = FileSystem.get(job); - + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); + SequenceFile.Writer writer = null; try { writer = new SequenceFile.Writer(fs, job, path, @@ -268,7 +268,7 @@ public class WriterBinaryBlock extends MatrixWriter throws IOException, DMLRuntimeException { boolean sparse = src.isInSparseFormat(); - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); //set up preferred custom serialization framework for binary block format if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java index ccd897c..6f33011 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java @@ -64,7 +64,7 @@ public class WriterBinaryBlockParallel extends WriterBinaryBlock } //create directory for concurrent tasks - MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); + MapReduceTool.createDirIfNotExistOnHDFS(path, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); //create and execute write tasks try http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java index c451f39..a072a6b 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java @@ -51,7 +51,7 @@ public class WriterBinaryCell extends MatrixWriter //core write writeBinaryCellMatrixToHDFS(path, job, src, rlen, clen, brlen, bclen); - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path); } @@ -62,8 +62,8 @@ public class WriterBinaryCell extends MatrixWriter { JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); Path path = new Path( fname ); - FileSystem fs = FileSystem.get(job); - + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); + SequenceFile.Writer writer = null; try { writer = new SequenceFile.Writer(fs, job, path, @@ -86,7 +86,7 @@ public class WriterBinaryCell extends MatrixWriter { boolean sparse = src.isInSparseFormat(); boolean entriesWritten = false; - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixCell.class); MatrixIndexes indexes = new MatrixIndexes(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java index 9838d01..a2e2013 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java @@ -52,9 +52,9 @@ public class WriterMatrixMarket extends MatrixWriter //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path( fname ); - + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); + //if the file already exists on HDFS, remove it. MapReduceTool.deleteFileIfExistOnHDFS( fname ); @@ -69,7 +69,7 @@ public class WriterMatrixMarket extends MatrixWriter throws IOException, DMLRuntimeException { Path path = new Path( fname ); - FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); + FileSystem fs = IOUtilFunctions.getFileSystem(path); FSDataOutputStream writer = null; try { @@ -172,13 +172,13 @@ public class WriterMatrixMarket extends MatrixWriter Path src = new Path (srcFileName); Path merge = new Path (fileName); - FileSystem hdfs = FileSystem.get(conf); - - if (hdfs.exists (merge)) { - hdfs.delete(merge, true); + FileSystem fs = IOUtilFunctions.getFileSystem(src, conf); + + if (fs.exists (merge)) { + fs.delete(merge, true); } - OutputStream out = hdfs.create(merge, true); + OutputStream out = fs.create(merge, true); // write out the header first StringBuilder sb = new StringBuilder(); @@ -189,12 +189,12 @@ public class WriterMatrixMarket extends MatrixWriter out.write (sb.toString().getBytes()); // if the source is a directory - if (hdfs.getFileStatus(src).isDirectory()) { + if (fs.getFileStatus(src).isDirectory()) { try { - FileStatus[] contents = hdfs.listStatus(src); + FileStatus[] contents = fs.listStatus(src); for (int i = 0; i < contents.length; i++) { if (!contents[i].isDirectory()) { - InputStream in = hdfs.open (contents[i].getPath()); + InputStream in = fs.open (contents[i].getPath()); try { IOUtils.copyBytes (in, out, conf, false); } finally { @@ -205,10 +205,10 @@ public class WriterMatrixMarket extends MatrixWriter } finally { IOUtilFunctions.closeSilently(out); } - } else if (hdfs.isFile(src)) { + } else if (fs.isFile(src)) { InputStream in = null; try { - in = hdfs.open (src); + in = fs.open (src); IOUtils.copyBytes (in, out, conf, true); } finally { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java index f3c1ce8..1fed377 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java @@ -62,7 +62,7 @@ public class WriterMatrixMarketParallel extends WriterMatrixMarket } //create directory for concurrent tasks - MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); + MapReduceTool.createDirIfNotExistOnHDFS(path, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); //create and execute tasks try http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java index 479ce40..6c20baa 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java @@ -64,9 +64,9 @@ public class WriterTextCSV extends MatrixWriter //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path( fname ); - + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); + //if the file already exists on HDFS, remove it. MapReduceTool.deleteFileIfExistOnHDFS( fname ); @@ -81,9 +81,9 @@ public class WriterTextCSV extends MatrixWriter throws IOException, DMLRuntimeException { JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path( fname ); - + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); + MatrixBlock src = new MatrixBlock((int)rlen, 1, true); writeCSVMatrixToHDFS(path, job, fs, src, _props); @@ -245,7 +245,7 @@ public class WriterTextCSV extends MatrixWriter Path srcFilePath = new Path(srcFileName); Path destFilePath = new Path(destFileName); - FileSystem hdfs = FileSystem.get(conf); + FileSystem fs = IOUtilFunctions.getFileSystem(srcFilePath, conf); if ( !_props.hasHeader() ) { // simply move srcFile to destFile @@ -259,22 +259,17 @@ public class WriterTextCSV extends MatrixWriter */ // delete the destination file, if exists already - //boolean ret1 = - hdfs.delete(destFilePath, true); + fs.delete(destFilePath, true); // Create /user/biadmin/csv/temp/out/file.csv so that ..../temp/out/ is created. - //boolean ret2 = - hdfs.createNewFile(destFilePath); + fs.createNewFile(destFilePath); // delete the file "file.csv" but preserve the directory structure /user/biadmin/csv/temp/out/ - //boolean ret3 = - hdfs.delete(destFilePath, true); + fs.delete(destFilePath, true); // finally, move the data to destFilePath = /user/biadmin/csv/temp/out/file.csv - //boolean ret4 = - hdfs.rename(srcFilePath, destFilePath); + fs.rename(srcFilePath, destFilePath); - //System.out.println("Return values = del:" + ret1 + ", createNew:" + ret2 + ", del:" + ret3 + ", rename:" + ret4); return; } @@ -287,11 +282,11 @@ public class WriterTextCSV extends MatrixWriter } sb.append('\n'); - if (hdfs.isDirectory(srcFilePath)) { + if (fs.isDirectory(srcFilePath)) { // compute sorted order among part files ArrayList<Path> files=new ArrayList<Path>(); - for(FileStatus stat: hdfs.listStatus(srcFilePath, CSVReblockMR.hiddenFileFilter)) + for(FileStatus stat: fs.listStatus(srcFilePath, CSVReblockMR.hiddenFileFilter)) files.add(stat.getPath()); Collections.sort(files); @@ -300,14 +295,14 @@ public class WriterTextCSV extends MatrixWriter // create a temp file, and add header and contents of first part Path tmp = new Path(firstpart.toString() + ".tmp"); - OutputStream out = hdfs.create(tmp, true); + OutputStream out = fs.create(tmp, true); out.write(sb.toString().getBytes()); sb.setLength(0); // copy rest of the data from firstpart InputStream in = null; try { - in = hdfs.open(firstpart); + in = fs.open(firstpart); IOUtils.copyBytes(in, out, conf, true); } finally { IOUtilFunctions.closeSilently(in); @@ -315,18 +310,18 @@ public class WriterTextCSV extends MatrixWriter } // rename tmp to firstpart - hdfs.delete(firstpart, true); - hdfs.rename(tmp, firstpart); + fs.delete(firstpart, true); + fs.rename(tmp, firstpart); // rename srcfile to destFile - hdfs.delete(destFilePath, true); - hdfs.createNewFile(destFilePath); // force the creation of directory structure - hdfs.delete(destFilePath, true); // delete the file, but preserve the directory structure - hdfs.rename(srcFilePath, destFilePath); // move the data + fs.delete(destFilePath, true); + fs.createNewFile(destFilePath); // force the creation of directory structure + fs.delete(destFilePath, true); // delete the file, but preserve the directory structure + fs.rename(srcFilePath, destFilePath); // move the data - } else if (hdfs.isFile(srcFilePath)) { + } else if (fs.isFile(srcFilePath)) { // create destination file - OutputStream out = hdfs.create(destFilePath, true); + OutputStream out = fs.create(destFilePath, true); // write header out.write(sb.toString().getBytes()); @@ -335,7 +330,7 @@ public class WriterTextCSV extends MatrixWriter // copy the data from srcFile InputStream in = null; try { - in = hdfs.open(srcFilePath); + in = fs.open(srcFilePath); IOUtils.copyBytes(in, out, conf, true); } finally { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java index eb82df1..581225f 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java @@ -64,7 +64,7 @@ public class WriterTextCSVParallel extends WriterTextCSV } //create directory for concurrent tasks - MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); + MapReduceTool.createDirIfNotExistOnHDFS(path, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); //create and execute tasks try http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java index 9389bdc..432d363 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java @@ -47,9 +47,9 @@ public class WriterTextCell extends MatrixWriter //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); Path path = new Path( fname ); - + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); + //if the file already exists on HDFS, remove it. MapReduceTool.deleteFileIfExistOnHDFS( fname ); @@ -64,16 +64,10 @@ public class WriterTextCell extends MatrixWriter throws IOException, DMLRuntimeException { Path path = new Path( fname ); - FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); - - FSDataOutputStream writer = null; - try{ - writer = fs.create(path); + FileSystem fs = IOUtilFunctions.getFileSystem(path); + try( FSDataOutputStream writer = fs.create(path) ){ writer.writeBytes("1 1 0"); } - finally{ - IOUtilFunctions.closeSilently(writer); - } IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path); } @@ -90,10 +84,8 @@ public class WriterTextCell extends MatrixWriter { boolean sparse = src.isInSparseFormat(); int clen = src.getNumColumns(); - - BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); - try + try( BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))) ) { //for obj reuse and preventing repeated buffer re-allocations StringBuilder sb = new StringBuilder(); @@ -144,8 +136,5 @@ public class WriterTextCell extends MatrixWriter br.write("1 1 0\n"); } } - finally { - IOUtilFunctions.closeSilently(br); - } } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java index 208a9c8..1c08459 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java @@ -61,7 +61,7 @@ public class WriterTextCellParallel extends WriterTextCell } //create directory for concurrent tasks - MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); + MapReduceTool.createDirIfNotExistOnHDFS(path, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); //create and execute tasks try http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/matrix/CleanupMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/CleanupMR.java b/src/main/java/org/apache/sysml/runtime/matrix/CleanupMR.java index 3dee28d..118016a 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/CleanupMR.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/CleanupMR.java @@ -40,7 +40,6 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.lib.NLineInputFormat; import org.apache.hadoop.mapred.lib.NullOutputFormat; -import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.conf.DMLConfig; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; @@ -110,22 +109,14 @@ public class CleanupMR private static void writeCleanupTasksToFile(Path path, int numTasks) throws DMLRuntimeException, IOException { - BufferedWriter br = null; - try - { - FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); - br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); - + FileSystem fs = IOUtilFunctions.getFileSystem(path); + try( BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))) ) { for( int i=1; i<=numTasks; i++ ) br.write( String.valueOf("CLEANUP TASK "+i)+"\n" ); } - catch(Exception ex) - { + catch(Exception ex) { throw new DMLRuntimeException("Error writing cleanup tasks to taskfile "+path.toString(), ex); } - finally { - IOUtilFunctions.closeSilently(br); - } } public static class CleanupMapper implements Mapper<LongWritable, Text, Writable, Writable> http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java b/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java index 4bbc6a3..137b25e 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java @@ -139,17 +139,16 @@ public class SortMR return parts; } - public void configure(JobConf job) { - try { - FileSystem fs = FileSystem.get(job); - Path partFile = new Path(MRJobConfiguration.getSortPartitionFilename(job)); - splitPoints = readPartitions(fs, partFile, job); - - } - catch (IOException ie) { - throw new IllegalArgumentException("can't read paritions file", ie); - } - } + public void configure(JobConf job) { + try { + Path partFile = new Path(MRJobConfiguration.getSortPartitionFilename(job)); + FileSystem fs = IOUtilFunctions.getFileSystem(partFile, job); + splitPoints = readPartitions(fs, partFile, job); + } + catch (IOException ie) { + throw new IllegalArgumentException("can't read paritions file", ie); + } + } public int getPartition(K key, V value, int numPartitions) { return findPartition(key)%numPartitions; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java index 6166d2f..85b55cb 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVAssignRowIDMapper.java @@ -33,6 +33,7 @@ import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.CSVReblockMR; import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount; import org.apache.sysml.runtime.transform.TfUtils; @@ -91,8 +92,9 @@ public class CSVAssignRowIDMapper extends MapReduceBase implements Mapper<LongWr //it doesn't make sense to have repeated file names in the input, since this is for reblock thisIndex = MRJobConfiguration.getInputMatrixIndexesInMapper(job).get(0); outKey.set(thisIndex); - FileSystem fs = FileSystem.get(job); - Path thisPath = new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)).makeQualified(fs); + Path thisPath = new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)); + FileSystem fs = IOUtilFunctions.getFileSystem(thisPath, job); + thisPath = thisPath.makeQualified(fs); filename = thisPath.toString(); String[] strs = job.getStrings(CSVReblockMR.SMALLEST_FILE_NAME_PER_INPUT); Path headerPath = new Path(strs[thisIndex]).makeQualified(fs);
