Repository: incubator-systemml Updated Branches: refs/heads/master 88f4a468f -> 557aae0ff
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVReblockMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVReblockMapper.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVReblockMapper.java index 8fcc725..aedd2b0 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVReblockMapper.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVReblockMapper.java @@ -152,8 +152,9 @@ public class CSVReblockMapper extends MapperBase implements Mapper<LongWritable, byte matrixIndex=representativeMatrixes.get(0); try { - FileSystem fs = FileSystem.get(job); - Path thisPath=new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)).makeQualified(fs); + Path thisPath=new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)); + FileSystem fs = IOUtilFunctions.getFileSystem(thisPath, job); + thisPath = thisPath.makeQualified(fs); String filename=thisPath.toString(); Path headerPath=new Path(job.getStrings(CSVReblockMR.SMALLEST_FILE_NAME_PER_INPUT)[matrixIndex]).makeQualified(fs); if(headerPath.toString().equals(filename)) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java index a7d881d..50a7412 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/MRJobConfiguration.java @@ -70,6 +70,7 @@ import org.apache.sysml.runtime.instructions.mr.ReblockInstruction; import org.apache.sysml.runtime.instructions.mr.RemoveEmptyMRInstruction; import org.apache.sysml.runtime.instructions.mr.UnaryMRInstructionBase; import org.apache.sysml.runtime.io.BinaryBlockSerialization; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.AddDummyWeightConverter; import org.apache.sysml.runtime.matrix.data.BinaryBlockToBinaryCellConverter; @@ -748,10 +749,9 @@ public class MRJobConfiguration for(int i=0; i<matrices.length; i++) matrices[i]=new Path(matrices[i]).toString(); - FileSystem fs=FileSystem.get(job); - Path thisFile=new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)).makeQualified(fs); - - //Path p=new Path(thisFileName); + Path thisFile=new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)); + FileSystem fs = IOUtilFunctions.getFileSystem(thisFile, job); + thisFile = thisFile.makeQualified(fs); Path thisDir=thisFile.getParent().makeQualified(fs); ArrayList<Byte> representativeMatrixes=new ArrayList<Byte>(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/matrix/sort/CompactInputFormat.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/CompactInputFormat.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/CompactInputFormat.java index a0faa15..350f65b 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/sort/CompactInputFormat.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/CompactInputFormat.java @@ -77,9 +77,9 @@ public class CompactInputFormat<K extends WritableComparable, V extends Writable @SuppressWarnings("unchecked") public CompactInputRecordReader(JobConf job, FileSplit split) throws IOException { - fs = FileSystem.get(job); path = split.getPath(); - totLength = split.getLength(); + fs = IOUtilFunctions.getFileSystem(path, job); + totLength = split.getLength(); currentStream = fs.open(path); keyClass=(Class<? extends WritableComparable>) job.getClass(KEY_CLASS, WritableComparable.class); valueClass=(Class<? extends Writable>) job.getClass(VALUE_CLASS, Writable.class); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java index 3559593..04899f3 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java @@ -242,7 +242,7 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M // check if the current part file needs to be processed path = split.getPath(); totLength = split.getLength(); - currentStream = FileSystem.get(job).open(path); + currentStream = IOUtilFunctions.getFileSystem(path, job).open(path); currPart = getIndexInTheArray(path.getName()); if ( currPart < beginPart || currPart > endPart ) { @@ -394,9 +394,9 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M public PickRecordReader(JobConf job, FileSplit split) throws IOException { - fs = FileSystem.get(job); - path = split.getPath(); - currentStream = fs.open(path); + path = split.getPath(); + fs = IOUtilFunctions.getFileSystem(path, job); + currentStream = fs.open(path); int partIndex=getIndexInTheArray(path.getName()); String arrStr=job.get(SELECTED_POINTS_PREFIX+partIndex); if(arrStr==null || arrStr.isEmpty()) { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java index 6d9cb1f..77c06ae 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java +++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java @@ -72,9 +72,9 @@ public class ApplyTfBBMapper extends MapperBase implements Mapper<LongWritable, OffsetCount value=new OffsetCount(); Path p=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME)); - FileSystem fs = FileSystem.get(job); - Path thisPath=new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)).makeQualified(fs); - String thisfile=thisPath.toString(); + Path path=new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); + String thisfile=path.makeQualified(fs).toString(); SequenceFile.Reader reader = null; try { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java index 3ee0e76..e2885d8 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java +++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMR.java @@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.CSVReblockMR; import org.apache.sysml.runtime.matrix.JobReturn; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; @@ -80,7 +81,7 @@ public class ApplyTfCSVMR { FileInputFormat.addInputPath(job, new Path(inputPath)); // delete outputPath, if exists already. Path outPath = new Path(outputPath); - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(outPath, job); fs.delete(outPath, true); FileOutputFormat.setOutputPath(job, outPath); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java index db3be0f..05b8a19 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java +++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java @@ -75,7 +75,8 @@ public class ApplyTfCSVMapper implements Mapper<LongWritable, Text, NullWritable // setup the writer for mapper's output // the default part-..... files will be deleted later once the job finishes - br = new BufferedWriter(new OutputStreamWriter(FileSystem.get(_rJob).create( mapOutputPath, true))); + FileSystem fs = IOUtilFunctions.getFileSystem(mapOutputPath); + br = new BufferedWriter(new OutputStreamWriter(fs.create( mapOutputPath, true))); } // output the header line http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java index 722c6a9..4a7839d 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java +++ b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java @@ -724,12 +724,12 @@ public class DataTransform TransformOperands oprnds = new TransformOperands(insts[0], inputs[0]); JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); // find the first file in alphabetical ordering of part files in directory inputPath String smallestFile = CSVReblockMR.findSmallestFile(job, oprnds.inputPath); // find column names + FileSystem fs = IOUtilFunctions.getFileSystem(smallestFile); String headerLine = readHeaderLine(fs, oprnds.inputCSVProperties, smallestFile); HashMap<String, Integer> colNamesToIds = processColumnNames(fs, oprnds.inputCSVProperties, headerLine, smallestFile); String outHeader = getOutputHeader(fs, headerLine, oprnds); @@ -990,9 +990,10 @@ public class DataTransform throws IOException, DMLRuntimeException, IllegalArgumentException, JSONException { JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); + // find the first file in alphabetical ordering of partfiles in directory inputPath String smallestFile = CSVReblockMR.findSmallestFile(job, oprnds.inputPath); + FileSystem fs = IOUtilFunctions.getFileSystem(smallestFile); // find column names String headerLine = readHeaderLine(fs, oprnds.inputCSVProperties, smallestFile); @@ -1364,7 +1365,7 @@ public class DataTransform TransformOperands oprnds = new TransformOperands(inst.getParams(), inputs[0]); JobConf job = new JobConf(); - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(inputs[0].getFileName()); checkIfOutputOverlapsWithTxMtd(oprnds.txMtdPath, outputs[0].getFileName(), fs); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java index da298ae..68024d9 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java +++ b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdMR.java @@ -33,6 +33,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; @@ -70,7 +71,7 @@ public class GenTfMtdMR { FileInputFormat.addInputPath(job, new Path(inputPath)); // delete outputPath, if exists already. Path outPath = new Path(txMtdPath); - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(outPath, job); fs.delete(outPath, true); FileOutputFormat.setOutputPath(job, outPath); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java index 58868e0..9e30f5c 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java +++ b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java @@ -186,9 +186,9 @@ public class TfUtils implements Serializable{ return false; // check for empty file - if (MapReduceTool.isFileEmpty(fs, path.toString())) + if( MapReduceTool.isFileEmpty(fs, path) ) if ( err ) - throw new EOFException("Empty input file " + path.toString() + "."); + throw new EOFException("Empty input file " + path.toString() + "."); else return false; @@ -196,21 +196,18 @@ public class TfUtils implements Serializable{ } public static String getPartFileName(JobConf job) throws IOException { - FileSystem fs = FileSystem.get(job); - Path thisPath=new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)).makeQualified(fs); - return thisPath.toString(); + Path path = new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); + path = path.makeQualified(fs); + return path.toString(); } public static boolean isPartFileWithHeader(JobConf job) throws IOException { - FileSystem fs = FileSystem.get(job); - String thisfile=getPartFileName(job); - Path smallestFilePath=new Path(job.get(MRJobConfiguration.TF_SMALLEST_FILE)).makeQualified(fs); - - if(thisfile.toString().equals(smallestFilePath.toString())) - return true; - else - return false; + Path path = new Path(job.get(MRJobConfiguration.TF_SMALLEST_FILE)); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); + path = path.makeQualified(fs); + return thisfile.toString().equals(path.toString()); } /** @@ -372,8 +369,8 @@ public class TfUtils implements Serializable{ fs = FileSystem.getLocal(job); } else { - fs = FileSystem.get(job); tfMtdDir = new Path(getTfMtdDir()); + fs = IOUtilFunctions.getFileSystem(tfMtdDir, job); } // load transformation metadata @@ -501,7 +498,7 @@ public class TfUtils implements Serializable{ private Reader initOffsetsReader(JobConf job) throws IOException { Path path=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME)); - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); Path[] files = MatrixReader.getSequenceFilePaths(fs, path); if ( files.length != 1 ) throw new IOException("Expecting a single file under counters file: " + path.toString()); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java b/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java index 82f1463..0d74088 100644 --- a/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java +++ b/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java @@ -43,7 +43,6 @@ import org.apache.sysml.conf.DMLConfig; import org.apache.sysml.lops.Lop; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.CacheBlock; -import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence; import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.data.FrameBlock; @@ -441,36 +440,6 @@ public class LocalFileUtils } /** - * Validate external directory and filenames as soon as they enter the system - * in order to prevent security issues such as path traversal, etc. - * Currently, external (user provided) filenames are: scriptfile, config file, - * local tmp working dir, hdfs working dir (scratch), read/write expressions, - * and several export functionalities. - * - * - * @param fname file name - * @param hdfs true if check for HDFS - * @return true if valid filename - */ - public static boolean validateExternalFilename( String fname, boolean hdfs ) - { - boolean ret = true; - - //check read local file from hdfs context - //(note: currently rejected with "wrong fs" anyway but this is impl-specific) - if( hdfs && !InfrastructureAnalyzer.isLocalMode() - && fname.startsWith("file:") ) - { - //prevent redirection to local file system - ret = false; - } - - //TODO white and black lists according to BI requirements - - return ret; - } - - /** * Writes a simple text file to local file system. * * @param file output file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java index cd32a0c..e81dbc1 100644 --- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java +++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java @@ -68,12 +68,6 @@ public class MapReduceTool private static final int MAX_DELETE_RETRIES = 10; private static final Log LOG = LogFactory.getLog(MapReduceTool.class.getName()); - private static JobConf _rJob = null; //cached job conf for read-only operations - - static{ - _rJob = ConfigurationManager.getCachedJobConf(); - } - public static String getUniqueKeyPerTask(JobConf job, boolean inMapper) { //TODO: investigate ID pattern, required for parallel jobs @@ -109,10 +103,15 @@ public class MapReduceTool return job.get(MRConfigurationNames.MR_TASK_ATTEMPT_ID); } - public static boolean existsFileOnHDFS(String fname){ + public static boolean existsFileOnHDFS(String fname) { + //robustness for empty strings (e.g., JMLC, MLContext) + if( fname == null || fname.isEmpty() ) + return false; + try { - return FileSystem.get(_rJob) - .exists(new Path(fname)); + Path path = new Path(fname); + return IOUtilFunctions + .getFileSystem(path).exists(path); } catch(Exception ex) { LOG.error("Failed check existsFileOnHDFS.", ex); @@ -121,9 +120,14 @@ public class MapReduceTool } public static boolean isDirectory(String fname) { + //robustness for empty strings (e.g., JMLC, MLContext) + if( fname == null || fname.isEmpty() ) + return false; + try { - return FileSystem.get(_rJob) - .isDirectory(new Path(fname)); + Path path = new Path(fname); + return IOUtilFunctions + .getFileSystem(path).isDirectory(path); } catch(Exception ex) { LOG.error("Failed check isDirectory.", ex); @@ -133,8 +137,9 @@ public class MapReduceTool public static FileStatus[] getDirectoryListing(String fname) { try { - return FileSystem.get(_rJob) - .listStatus(new Path(fname)); + Path path = new Path(fname); + return IOUtilFunctions + .getFileSystem(path).listStatus(path); } catch(Exception ex) { LOG.error("Failed listing of directory contents.", ex); @@ -148,11 +153,12 @@ public class MapReduceTool } public static void deleteFileIfExistOnHDFS(String dir) throws IOException { - deleteFileIfExists(FileSystem.get(_rJob), new Path(dir)); + Path path = new Path(dir); + deleteFileIfExists(IOUtilFunctions.getFileSystem(path), path); } public static void deleteFileIfExistOnHDFS(Path outpath, JobConf job) throws IOException { - deleteFileIfExists(FileSystem.get(job), outpath); + deleteFileIfExists(IOUtilFunctions.getFileSystem(outpath, job), outpath); } public static void deleteFileIfExistOnLFS(Path outpath, JobConf job) throws IOException { @@ -169,17 +175,20 @@ public class MapReduceTool } public static boolean isHDFSFileEmpty(String dir) throws IOException { - FileSystem fs = FileSystem.get(_rJob); - return isFileEmpty(fs, dir); + //robustness for empty strings (e.g., JMLC, MLContext) + if( dir == null || dir.isEmpty() ) + return false; + Path path = new Path(dir); + FileSystem fs = IOUtilFunctions.getFileSystem(path); + return isFileEmpty(fs, path); } - public static boolean isFileEmpty(FileSystem fs, String dir) throws IOException { - Path pth = new Path(dir); - FileStatus fstat = fs.getFileStatus(pth); + public static boolean isFileEmpty(FileSystem fs, Path dir) throws IOException { + FileStatus fstat = fs.getFileStatus(dir); if (fstat.isDirectory()) { // it is a directory - FileStatus[] stats = fs.listStatus(pth); + FileStatus[] stats = fs.listStatus(dir); if (stats != null) { for (FileStatus stat : stats) { if (stat.getLen() > 0) @@ -189,33 +198,35 @@ public class MapReduceTool } else { return true; } - } else { + } + else { // it is a regular file - if (fstat.getLen() == 0) - return true; - else - return false; + return (fstat.getLen() == 0); } } public static void renameFileOnHDFS(String originalDir, String newDir) throws IOException { - Path originalpath = new Path(originalDir); + Path pathOrig = new Path(originalDir); + Path pathNew = new Path(newDir); + if( !IOUtilFunctions.isSameFileScheme(pathOrig, pathNew) ) + throw new IOException("Cannot rename files to different target file system."); deleteFileIfExistOnHDFS(newDir); - Path newpath = new Path(newDir); - - FileSystem fs = FileSystem.get(_rJob); - if (fs.exists(originalpath)) { - fs.rename(originalpath, newpath); - } - else { + FileSystem fs = IOUtilFunctions.getFileSystem(pathOrig); + if( fs.exists(pathOrig) ) + fs.rename(pathOrig, pathNew); + else throw new FileNotFoundException(originalDir); - } } public static void mergeIntoSingleFile(String originalDir, String newFile) throws IOException { - FileSystem fs = FileSystem.get(_rJob); - FileUtil.copyMerge(fs, new Path(originalDir), fs, new Path(newFile), true, _rJob, null); + Path pathOrig = new Path(originalDir); + Path pathNew = new Path(newFile); + if( !IOUtilFunctions.isSameFileScheme(pathOrig, pathNew) ) + throw new IOException("Cannot merge files into different target file system."); + FileSystem fs = IOUtilFunctions.getFileSystem(pathOrig); + FileUtil.copyMerge(fs, pathOrig, fs, pathNew, true, + ConfigurationManager.getCachedJobConf(), null); } public static void copyFileOnHDFS(String originalDir, String newDir) throws IOException { @@ -225,7 +236,7 @@ public class MapReduceTool boolean overwrite = true; JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(originalPath, job); if (fs.exists(originalPath)) { FileUtil.copy(fs, originalPath, fs, newPath, deleteSource, overwrite, job); } @@ -241,7 +252,7 @@ public class MapReduceTool public static long getFilesizeOnHDFS( Path path ) throws IOException { - FileSystem fs = FileSystem.get(_rJob); + FileSystem fs = IOUtilFunctions.getFileSystem(path); long ret = 0; //in bytes if( fs.isDirectory(path) ) ret = fs.getContentSummary(path).getLength(); @@ -253,9 +264,9 @@ public class MapReduceTool } private static BufferedReader setupInputFile ( String filename ) throws IOException { - Path pt=new Path(filename); - FileSystem fs = FileSystem.get(_rJob); - BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(pt))); + Path path = new Path(filename); + FileSystem fs = IOUtilFunctions.getFileSystem(path); + BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(path))); return br; } @@ -305,9 +316,9 @@ public class MapReduceTool } private static BufferedWriter setupOutputFile ( String filename ) throws IOException { - Path pt=new Path(filename); - FileSystem fs = FileSystem.get(_rJob); - BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); + Path path = new Path(filename); + FileSystem fs = IOUtilFunctions.getFileSystem(path); + BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); return br; } @@ -351,17 +362,17 @@ public class MapReduceTool public static MatrixCharacteristics[] processDimsFiles(String dir, MatrixCharacteristics[] stats) throws IOException { - Path pt=new Path(dir); - FileSystem fs = FileSystem.get(_rJob); + Path path = new Path(dir); + FileSystem fs = IOUtilFunctions.getFileSystem(path); - if ( !fs.exists(pt) ) + if ( !fs.exists(path) ) return stats; - FileStatus fstat = fs.getFileStatus(pt); + FileStatus fstat = fs.getFileStatus(path); if ( fstat.isDirectory() ) { - FileStatus[] files = fs.listStatus(pt); + FileStatus[] files = fs.listStatus(path); for ( int i=0; i < files.length; i++ ) { Path filePath = files[i].getPath(); try( BufferedReader br = setupInputFile(filePath.toString()) ) { @@ -405,9 +416,9 @@ public class MapReduceTool OutputInfo outinfo, FileFormatProperties formatProperties) throws IOException { - Path pt = new Path(mtdfile); - FileSystem fs = FileSystem.get(_rJob); - try( BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))) ) { + Path path = new Path(mtdfile); + FileSystem fs = IOUtilFunctions.getFileSystem(path); + try( BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))) ) { String mtd = metaDataToString(vt, schema, dt, mc, outinfo, formatProperties); br.write(mtd); } catch (Exception e) { @@ -418,9 +429,9 @@ public class MapReduceTool public static void writeScalarMetaDataFile(String mtdfile, ValueType vt) throws IOException { - Path pt = new Path(mtdfile); - FileSystem fs = FileSystem.get(_rJob); - try( BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))) ) { + Path path = new Path(mtdfile); + FileSystem fs = IOUtilFunctions.getFileSystem(path); + try( BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))) ) { String mtd = metaDataToString(vt, null, DataType.SCALAR, null, OutputInfo.TextCellOutputInfo, null); br.write(mtd); } @@ -555,8 +566,8 @@ public class MapReduceTool else offset=(int)pos-1; - FileSystem fs=FileSystem.get(_rJob); Path path=new Path(dir); + FileSystem fs=IOUtilFunctions.getFileSystem(path); FileStatus[] files=fs.listStatus(path); Path fileToRead=null; for(FileStatus file: files) @@ -610,11 +621,16 @@ public class MapReduceTool } public static void createDirIfNotExistOnHDFS(String dir, String permissions) + throws IOException + { + createDirIfNotExistOnHDFS(new Path(dir), permissions); + } + + public static void createDirIfNotExistOnHDFS(Path path, String permissions) throws IOException { - Path path = new Path(dir); try { - FileSystem fs = FileSystem.get(_rJob); + FileSystem fs = IOUtilFunctions.getFileSystem(path); if( !fs.exists(path) ) { char[] c = permissions.toCharArray(); @@ -637,8 +653,8 @@ public class MapReduceTool public static FSDataOutputStream getHDFSDataOutputStream(String filename, boolean overwrite) throws IOException { - FileSystem fs = FileSystem.get(_rJob); Path path = new Path(filename); - return fs.create(path, overwrite); + return IOUtilFunctions.getFileSystem(path) + .create(path, overwrite); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/udf/lib/RemoveEmptyRows.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/udf/lib/RemoveEmptyRows.java b/src/main/java/org/apache/sysml/udf/lib/RemoveEmptyRows.java index b45d225..195a9e0 100644 --- a/src/main/java/org/apache/sysml/udf/lib/RemoveEmptyRows.java +++ b/src/main/java/org/apache/sysml/udf/lib/RemoveEmptyRows.java @@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.sysml.conf.ConfigurationManager; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.util.MapReduceTool; import org.apache.sysml.udf.FunctionParameter; import org.apache.sysml.udf.Matrix; @@ -77,7 +78,7 @@ public class RemoveEmptyRows extends PackageFunction //prepare input JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); Path path = new Path(fnameOld); - FileSystem fs = FileSystem.get(job); + FileSystem fs = IOUtilFunctions.getFileSystem(path, job); if( !fs.exists(path) ) throw new IOException("File "+fnameOld+" does not exist on HDFS."); FileInputFormat.addInputPath(job, path); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/yarn/DMLAppMaster.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/yarn/DMLAppMaster.java b/src/main/java/org/apache/sysml/yarn/DMLAppMaster.java index 8954951..ba2a9f1 100644 --- a/src/main/java/org/apache/sysml/yarn/DMLAppMaster.java +++ b/src/main/java/org/apache/sysml/yarn/DMLAppMaster.java @@ -43,6 +43,7 @@ import org.apache.sysml.api.DMLScript; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.conf.DMLConfig; import org.apache.sysml.runtime.DMLScriptException; +import org.apache.sysml.runtime.io.IOUtilFunctions; public class DMLAppMaster { @@ -136,7 +137,7 @@ public class DMLAppMaster //write given message to hdfs try { - FileSystem fs = FileSystem.get(_conf); + FileSystem fs = IOUtilFunctions.getFileSystem(msgPath, _conf); try( FSDataOutputStream fout = fs.create(msgPath, true) ) { fout.writeBytes( msg ); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/main/java/org/apache/sysml/yarn/DMLYarnClient.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/yarn/DMLYarnClient.java b/src/main/java/org/apache/sysml/yarn/DMLYarnClient.java index c4da603..eb7a492 100644 --- a/src/main/java/org/apache/sysml/yarn/DMLYarnClient.java +++ b/src/main/java/org/apache/sysml/yarn/DMLYarnClient.java @@ -57,6 +57,7 @@ import org.apache.sysml.parser.ParseException; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.DMLScriptException; import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.util.MapReduceTool; /** @@ -264,17 +265,17 @@ public class DMLYarnClient private void copyResourcesToHdfsWorkingDir( YarnConfiguration yconf, String hdfsWD ) throws ParseException, IOException, DMLRuntimeException, InterruptedException { - FileSystem fs = FileSystem.get(yconf); + Path confPath = new Path(hdfsWD, DML_CONFIG_NAME); + FileSystem fs = IOUtilFunctions.getFileSystem(confPath, yconf); //create working directory - MapReduceTool.createDirIfNotExistOnHDFS(hdfsWD, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); + MapReduceTool.createDirIfNotExistOnHDFS(confPath, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); //serialize the dml config to HDFS file //NOTE: we do not modify and ship the absolute scratch space path of the current user //because this might result in permission issues if the app master is run with a different user //(runtime plan migration during resource reoptimizations now needs to use qualified names //for shipping/reading intermediates) TODO modify resource reoptimizer on prototype integration. - Path confPath = new Path(hdfsWD, DML_CONFIG_NAME); try( FSDataOutputStream fout = fs.create(confPath, true) ) { fout.writeBytes(_dmlConfig.serializeDMLConfig() + "\n"); } @@ -451,7 +452,8 @@ public class DMLYarnClient Path path = new Path(_hdfsJarFile); LocalResource resource = Records.newRecord(LocalResource.class); - FileStatus jarStat = FileSystem.get(yconf).getFileStatus(path); + FileStatus jarStat = IOUtilFunctions + .getFileSystem(path, yconf).getFileStatus(path); resource.setResource(ConverterUtils.getYarnUrlFromPath(path)); resource.setSize(jarStat.getLen()); resource.setTimestamp(jarStat.getModificationTime()); @@ -518,9 +520,8 @@ public class DMLYarnClient //write given message to hdfs try { - FileSystem fs = FileSystem.get(yconf); - if( fs.exists(msgPath) ) - { + FileSystem fs = IOUtilFunctions.getFileSystem(msgPath, yconf); + if( fs.exists(msgPath) ) { try( BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(msgPath))) ) { ret = br.readLine(); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/test/java/org/apache/sysml/test/integration/functions/transform/ScalingTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/transform/ScalingTest.java b/src/test/java/org/apache/sysml/test/integration/functions/transform/ScalingTest.java index b66813d..a80e8b3 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/transform/ScalingTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/transform/ScalingTest.java @@ -34,6 +34,7 @@ import org.apache.sysml.api.DMLScript; import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.io.ReaderBinaryBlock; import org.apache.sysml.runtime.io.ReaderTextCSV; import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; @@ -142,7 +143,7 @@ public class ScalingTest extends AutomatedTestBase } outputSpec.put(TfUtils.TXMETHOD_SCALE, scaleSpec); - FileSystem fs = FileSystem.get(TestUtils.conf); + FileSystem fs = IOUtilFunctions.getFileSystem(specFile); try( BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(specFile),true))) ) { out.write(outputSpec.toString()); } @@ -157,7 +158,7 @@ public class ScalingTest extends AutomatedTestBase mtd.put("format", "csv"); mtd.put("header", false); - FileSystem fs = FileSystem.get(TestUtils.conf); + FileSystem fs = IOUtilFunctions.getFileSystem(datafile); try( BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(datafile+".mtd"),true))) ) { out.write(mtd.toString()); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/557aae0f/src/test/java/org/apache/sysml/test/utils/TestUtils.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/utils/TestUtils.java b/src/test/java/org/apache/sysml/test/utils/TestUtils.java index 19b741d..beb19a3 100644 --- a/src/test/java/org/apache/sysml/test/utils/TestUtils.java +++ b/src/test/java/org/apache/sysml/test/utils/TestUtils.java @@ -98,9 +98,9 @@ public class TestUtils try { String lineExpected = null; String lineActual = null; - FileSystem fs = FileSystem.get(conf); Path compareFile = new Path(expectedFile); + FileSystem fs = IOUtilFunctions.getFileSystem(compareFile, conf); FSDataInputStream fsin = fs.open(compareFile); try( BufferedReader compareIn = new BufferedReader(new InputStreamReader(fsin)) ) { lineExpected = compareIn.readLine(); @@ -130,9 +130,9 @@ public class TestUtils try { HashMap<CellIndex, Double> expectedValues = new HashMap<CellIndex, Double>(); - FileSystem fs = FileSystem.get(conf); Path outDirectory = new Path(actualDir); Path compareFile = new Path(expectedFile); + FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf); FSDataInputStream fsin = fs.open(compareFile); try( BufferedReader compareIn = new BufferedReader(new InputStreamReader(fsin)) ) { String line; @@ -207,9 +207,9 @@ public class TestUtils */ public static void compareMMMatrixWithJavaMatrix(String expectedFile, String actualDir, double epsilon) { try { - FileSystem fs = FileSystem.get(conf); Path outDirectory = new Path(actualDir); Path compareFile = new Path(expectedFile); + FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf); FSDataInputStream fsin = fs.open(compareFile); HashMap<CellIndex, Double> expectedValues = new HashMap<CellIndex, Double>(); @@ -300,9 +300,9 @@ public class TestUtils */ public static void compareDMLMatrixWithJavaMatrix(String expectedFile, String actualDir, double epsilon) { try { - FileSystem fs = FileSystem.get(conf); Path outDirectory = new Path(actualDir); Path compareFile = new Path(expectedFile); + FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf); FSDataInputStream fsin = fs.open(compareFile); HashMap<CellIndex, Double> expectedValues = new HashMap<CellIndex, Double>(); @@ -370,8 +370,8 @@ public class TestUtils try { - FileSystem fs = FileSystem.get(conf); Path outDirectory = new Path(filePath); + FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf); String line; FileStatus[] outFiles = fs.listStatus(outDirectory); @@ -462,11 +462,10 @@ public class TestUtils } public static double readDMLScalar(String filePath) { - FileSystem fs; try { double d=Double.NaN; - fs = FileSystem.get(conf); Path outDirectory = new Path(filePath); + FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf); String line; FileStatus[] outFiles = fs.listStatus(outDirectory); for (FileStatus file : outFiles) { @@ -485,11 +484,10 @@ public class TestUtils } public static boolean readDMLBoolean(String filePath) { - FileSystem fs; try { Boolean b = null; - fs = FileSystem.get(conf); Path outDirectory = new Path(filePath); + FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf); String line; FileStatus[] outFiles = fs.listStatus(outDirectory); for (FileStatus file : outFiles) { @@ -508,11 +506,10 @@ public class TestUtils } public static String readDMLString(String filePath) { - FileSystem fs; try { StringBuilder sb = new StringBuilder(); - fs = FileSystem.get(conf); Path outDirectory = new Path(filePath); + FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf); FileStatus[] outFiles = fs.listStatus(outDirectory); for (FileStatus file : outFiles) { FSDataInputStream fsout = fs.open(file.getPath()); @@ -989,8 +986,8 @@ public class TestUtils */ public static void compareDMLHDFSFileWithRFile(String rFile, String hdfsDir, double epsilon) { try { - FileSystem fs = FileSystem.get(conf); Path outDirectory = new Path(hdfsDir); + FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf); HashMap<CellIndex, Double> expectedValues = new HashMap<CellIndex, Double>(); HashMap<CellIndex, Double> actualValues = new HashMap<CellIndex, Double>(); try(BufferedReader compareIn = new BufferedReader(new FileReader(rFile))) { @@ -1089,8 +1086,8 @@ public class TestUtils */ public static void checkMatrix(String outDir, long rows, long cols, double min, double max) { try { - FileSystem fs = FileSystem.get(conf); Path outDirectory = new Path(outDir); + FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf); assertTrue(outDir + " does not exist", fs.exists(outDirectory)); if( fs.getFileStatus(outDirectory).isDirectory() ) @@ -1143,8 +1140,8 @@ public class TestUtils */ public static void checkForOutputExistence(String outDir) { try { - FileSystem fs = FileSystem.get(conf); Path outDirectory = new Path(outDir); + FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf); FileStatus[] outFiles = fs.listStatus(outDirectory); assertEquals("number of files in directory not 1", 1, outFiles.length); FSDataInputStream fsout = fs.open(outFiles[0].getPath()); @@ -1169,9 +1166,9 @@ public class TestUtils */ public static void removeHDFSDirectories(String[] directories) { try { - FileSystem fs = FileSystem.get(conf); for (String directory : directories) { Path dir = new Path(directory); + FileSystem fs = IOUtilFunctions.getFileSystem(dir, conf); if (fs.exists(dir) && fs.getFileStatus(dir).isDirectory()) { fs.delete(dir, true); } @@ -1219,9 +1216,9 @@ public class TestUtils */ public static void removeHDFSFiles(String[] files) { try { - FileSystem fs = FileSystem.get(conf); for (String directory : files) { Path dir = new Path(directory); + FileSystem fs = IOUtilFunctions.getFileSystem(dir, conf); if (fs.exists(dir) && !fs.getFileStatus(dir).isDirectory()) { fs.delete(dir, false); } @@ -1258,8 +1255,9 @@ public class TestUtils */ public static void clearDirectory(String directory) { try { - FileSystem fs = FileSystem.get(conf); - FileStatus[] directoryContent = fs.listStatus(new Path(directory)); + Path path = new Path(directory); + FileSystem fs = IOUtilFunctions.getFileSystem(path, conf); + FileStatus[] directoryContent = fs.listStatus(path); for (FileStatus content : directoryContent) { fs.delete(content.getPath(), true); } @@ -1368,8 +1366,8 @@ public class TestUtils public static void generateTestMatrixToFile(String file, int rows, int cols, double min, double max, double sparsity, long seed) { try { - FileSystem fs = FileSystem.get(conf); Path inFile = new Path(file); + FileSystem fs = IOUtilFunctions.getFileSystem(inFile, conf); DataOutputStream out = fs.create(inFile); try( PrintWriter pw = new PrintWriter(out) ) { Random random = (seed == -1) ? TestUtils.random : new Random(seed); @@ -1411,8 +1409,9 @@ public class TestUtils try { //create outputstream to HDFS / FS and writer - FileSystem fs = FileSystem.get(conf); - DataOutputStream out = fs.create(new Path(file), true); + Path path = new Path(file); + FileSystem fs = IOUtilFunctions.getFileSystem(path, conf); + DataOutputStream out = fs.create(path, true); try( BufferedWriter pw = new BufferedWriter(new OutputStreamWriter(out))) { //writer actual matrix StringBuilder sb = new StringBuilder(); @@ -1457,8 +1456,9 @@ public class TestUtils //create outputstream to HDFS / FS and writer DataOutputStream out = null; if (!isR) { - FileSystem fs = FileSystem.get(conf); - out = fs.create(new Path(file), true); + Path path = new Path(file); + FileSystem fs = IOUtilFunctions.getFileSystem(path, conf); + out = fs.create(path, true); } else { out = new DataOutputStream(new FileOutputStream(file)); @@ -1605,7 +1605,9 @@ public class TestUtils try { SequenceFile.Writer writer = null; try { - writer = new SequenceFile.Writer(FileSystem.get(conf), conf, new Path(file), + Path path = new Path(file); + FileSystem fs = IOUtilFunctions.getFileSystem(path, conf); + writer = new SequenceFile.Writer(fs, conf, path, MatrixIndexes.class, MatrixCell.class); MatrixIndexes index = new MatrixIndexes(); @@ -1651,7 +1653,9 @@ public class TestUtils SequenceFile.Writer writer = null; try { - writer = new SequenceFile.Writer(FileSystem.get(conf), conf, new Path(file), + Path path = new Path(file); + FileSystem fs = IOUtilFunctions.getFileSystem(path, conf); + writer = new SequenceFile.Writer(fs, conf, path, MatrixIndexes.class, MatrixBlock.class); MatrixIndexes index = new MatrixIndexes(); @@ -1774,8 +1778,8 @@ public class TestUtils */ public static void removeTemporaryFiles() { try { - FileSystem fs = FileSystem.get(conf); Path workingDir = new Path("."); + FileSystem fs = IOUtilFunctions.getFileSystem(workingDir, conf); FileStatus[] files = fs.listStatus(workingDir); for (FileStatus file : files) { String fileName = file.getPath().toString().substring( @@ -1799,8 +1803,8 @@ public class TestUtils */ public static boolean checkForTemporaryFiles() { try { - FileSystem fs = FileSystem.get(conf); Path workingDir = new Path("."); + FileSystem fs = IOUtilFunctions.getFileSystem(workingDir, conf); FileStatus[] files = fs.listStatus(workingDir); for (FileStatus file : files) { String fileName = file.getPath().toString().substring( @@ -1828,8 +1832,9 @@ public class TestUtils */ public static Path getFileInDirectory(String directory) { try { - FileSystem fs = FileSystem.get(conf); - FileStatus[] files = fs.listStatus(new Path(directory)); + Path path = new Path(directory); + FileSystem fs = IOUtilFunctions.getFileSystem(path, conf); + FileStatus[] files = fs.listStatus(path); if (files.length != 1) throw new IOException("requires exactly one file in directory " + directory); @@ -1851,8 +1856,9 @@ public class TestUtils * filename */ public static void createFile(String filename) throws IOException { - FileSystem fs = FileSystem.get(conf); - fs.create(new Path(filename)); + Path path = new Path(filename); + FileSystem fs = IOUtilFunctions.getFileSystem(path, conf); + fs.create(path); } /**
