[MINOR] Fix various i/o resource leaks in parser/compiler/runtime/tests Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/795a94e7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/795a94e7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/795a94e7
Branch: refs/heads/master Commit: 795a94e7b468dd2535b489c3d4d1e60ef99e413c Parents: 977240d Author: Matthias Boehm <[email protected]> Authored: Sun Mar 19 23:08:10 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Sun Mar 19 23:09:00 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/sysml/api/DMLScript.java | 7 +- .../java/org/apache/sysml/api/MLContext.java | 5 +- .../apache/sysml/hops/recompile/Recompiler.java | 4 +- .../org/apache/sysml/parser/AParserWrapper.java | 7 +- .../org/apache/sysml/parser/DataExpression.java | 218 +++----- .../sysml/runtime/codegen/CodegenUtils.java | 11 +- .../controlprogram/ParForProgramBlock.java | 13 +- .../parfor/DataPartitionerLocal.java | 43 +- .../parfor/DataPartitionerRemoteReducer.java | 7 +- .../controlprogram/parfor/RemoteDPParForMR.java | 7 +- .../parfor/RemoteParForColocatedFileSplit.java | 12 +- .../controlprogram/parfor/RemoteParForMR.java | 7 +- .../parfor/ResultMergeLocalFile.java | 61 +-- .../parfor/util/StagingFileUtils.java | 32 +- .../ParameterizedBuiltinCPFileInstruction.java | 54 +- .../instructions/gpu/context/JCudaKernels.java | 21 +- .../instructions/spark/RandSPInstruction.java | 20 +- .../runtime/io/BinaryBlockSerialization.java | 8 +- .../sysml/runtime/io/IOUtilFunctions.java | 16 +- .../sysml/runtime/io/ReaderTextCSVParallel.java | 3 +- .../sysml/runtime/io/WriterBinaryBlock.java | 21 +- .../sysml/runtime/io/WriterBinaryCell.java | 17 +- .../sysml/runtime/io/WriterMatrixMarket.java | 12 +- .../apache/sysml/runtime/io/WriterTextCell.java | 13 +- .../apache/sysml/runtime/matrix/CleanupMR.java | 7 +- .../apache/sysml/runtime/matrix/DataGenMR.java | 140 +++-- .../org/apache/sysml/runtime/matrix/SortMR.java | 8 +- .../sysml/runtime/matrix/data/MatrixBlock.java | 23 +- .../matrix/data/UnPaddedOutputFormat.java | 4 +- .../runtime/matrix/mapred/CSVReblockMapper.java | 15 +- .../mapred/CollectMultipleConvertedOutputs.java | 2 +- .../runtime/matrix/sort/CompactInputFormat.java | 3 +- .../matrix/sort/CompactOutputFormat.java | 3 +- .../matrix/sort/PickFromCompactInputFormat.java | 6 +- .../matrix/sort/SamplingSortMRInputFormat.java | 39 +- .../runtime/transform/ApplyTfBBMapper.java | 20 +- .../runtime/transform/ApplyTfCSVMapper.java | 4 +- .../sysml/runtime/transform/BinAgent.java | 43 +- .../sysml/runtime/transform/DataTransform.java | 227 ++++---- .../sysml/runtime/transform/DummycodeAgent.java | 70 ++- .../sysml/runtime/transform/GTFMTDReducer.java | 29 +- .../sysml/runtime/transform/GenTfMtdSPARK.java | 25 +- .../sysml/runtime/transform/MVImputeAgent.java | 36 +- .../sysml/runtime/transform/RecodeAgent.java | 109 ++-- .../apache/sysml/runtime/transform/TfUtils.java | 26 +- .../sysml/runtime/util/MapReduceTool.java | 184 ++++--- .../InstallDependencyForIntegrationTests.java | 10 +- .../apache/sysml/utils/ParameterBuilder.java | 57 +- .../org/apache/sysml/yarn/DMLAppMaster.java | 6 +- .../org/apache/sysml/yarn/DMLYarnClient.java | 21 +- .../functions/jmlc/FrameCastingTest.java | 7 +- .../functions/jmlc/FrameDecodeTest.java | 7 +- .../functions/jmlc/FrameEncodeTest.java | 7 +- .../functions/jmlc/FrameIndexingAppendTest.java | 7 +- .../functions/jmlc/FrameLeftIndexingTest.java | 7 +- .../functions/jmlc/FrameReadMetaTest.java | 6 +- .../functions/jmlc/FrameTransformTest.java | 7 +- .../functions/jmlc/ReuseModelVariablesTest.java | 7 +- .../functions/misc/DataTypeChangeTest.java | 10 +- .../parfor/ParForDependencyAnalysisTest.java | 10 +- .../functions/transform/ScalingTest.java | 12 +- .../org/apache/sysml/test/utils/TestUtils.java | 540 +++++++++---------- 62 files changed, 1140 insertions(+), 1223 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/api/DMLScript.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/DMLScript.java b/src/main/java/org/apache/sysml/api/DMLScript.java index 9959aea..5bf5338 100644 --- a/src/main/java/org/apache/sysml/api/DMLScript.java +++ b/src/main/java/org/apache/sysml/api/DMLScript.java @@ -73,6 +73,7 @@ import org.apache.sysml.runtime.controlprogram.caching.CacheableData; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.ExecutionContextFactory; import org.apache.sysml.runtime.instructions.gpu.context.GPUContext; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.controlprogram.parfor.ProgramConverter; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; @@ -476,10 +477,8 @@ public class DMLScript LOG.error("Failed to read the script from the file system", ex); throw ex; } - finally - { - if( in != null ) - in.close(); + finally { + IOUtilFunctions.closeSilently(in); } dmlScriptStr = sb.toString(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/api/MLContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/MLContext.java b/src/main/java/org/apache/sysml/api/MLContext.java index a5dc859..6382832 100644 --- a/src/main/java/org/apache/sysml/api/MLContext.java +++ b/src/main/java/org/apache/sysml/api/MLContext.java @@ -78,6 +78,7 @@ import org.apache.sysml.runtime.instructions.spark.functions.CopyTextInputFuncti import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils; import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils; import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.MatrixFormatMetaData; import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; @@ -1263,8 +1264,8 @@ public class MLContext { if(s1.hasNextInt()) return 1; } finally { - if(s1 != null) s1.close(); - if(s2 != null) s2.close(); + IOUtilFunctions.closeSilently(s1); + IOUtilFunctions.closeSilently(s2); } return 0; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java index 5533686..ecf6aac 100644 --- a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java +++ b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java @@ -95,6 +95,7 @@ import org.apache.sysml.runtime.instructions.cp.IntObject; import org.apache.sysml.runtime.instructions.cp.ScalarObject; import org.apache.sysml.runtime.instructions.mr.RandInstruction; import org.apache.sysml.runtime.instructions.mr.SeqInstruction; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.MatrixFormatMetaData; import org.apache.sysml.runtime.matrix.data.FrameBlock; @@ -1949,8 +1950,7 @@ public class Recompiler dop.setDim2((dt==DataType.MATRIX||dt==DataType.FRAME)?Long.parseLong(mtd.get(DataExpression.READCOLPARAM).toString()):0); } finally { - if( br != null ) - br.close(); + IOUtilFunctions.closeSilently(br); } } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/parser/AParserWrapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/parser/AParserWrapper.java b/src/main/java/org/apache/sysml/parser/AParserWrapper.java index 4e6ee05..a413f53 100644 --- a/src/main/java/org/apache/sysml/parser/AParserWrapper.java +++ b/src/main/java/org/apache/sysml/parser/AParserWrapper.java @@ -34,6 +34,7 @@ import org.apache.sysml.parser.common.CommonSyntacticValidator; import org.apache.sysml.parser.common.CustomErrorListener.ParseIssue; import org.apache.sysml.parser.dml.DMLParserWrapper; import org.apache.sysml.parser.pydml.PyDMLParserWrapper; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.util.LocalFileUtils; /** @@ -147,10 +148,8 @@ public abstract class AParserWrapper LOG.error("Failed to read the script from the file system", ex); throw ex; } - finally - { - if( in != null ) - in.close(); + finally { + IOUtilFunctions.closeSilently(in); } dmlScriptStr = sb.toString(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/parser/DataExpression.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/parser/DataExpression.java b/src/main/java/org/apache/sysml/parser/DataExpression.java index 9370bdd..9e8dca9 100644 --- a/src/main/java/org/apache/sysml/parser/DataExpression.java +++ b/src/main/java/org/apache/sysml/parser/DataExpression.java @@ -35,6 +35,7 @@ import org.apache.sysml.hops.DataGenOp; import org.apache.sysml.parser.LanguageException.LanguageErrorCodes; import org.apache.sysml.parser.common.CustomErrorListener; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.util.LocalFileUtils; import org.apache.sysml.runtime.util.MapReduceTool; import org.apache.sysml.runtime.util.UtilFunctions; @@ -1912,98 +1913,52 @@ public class DataExpression extends DataIdentifier throws LanguageException { JSONObject retVal = null; - boolean exists = false; - FileSystem fs = null; - - try { - fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); - } catch (Exception e){ - raiseValidateError("could not read the configuration file: "+e.getMessage(), false); - } - - Path pt = new Path(filename); - try { - if (fs.exists(pt)){ - exists = true; - } - } catch (Exception e){ - exists = false; - } - - boolean isDirBoolean = false; - try { - if (exists && fs.getFileStatus(pt).isDirectory()) - isDirBoolean = true; - else - isDirBoolean = false; - } - catch(Exception e){ - raiseValidateError("error validing whether path " + pt.toString() + " is directory or not: "+e.getMessage(), conditional); - } + boolean exists = MapReduceTool.existsFileOnHDFS(filename); + boolean isDir = MapReduceTool.isDirectory(filename); // CASE: filename is a directory -- process as a directory - if (exists && isDirBoolean){ - - // read directory contents + if( exists && isDir ) + { retVal = new JSONObject(); - - FileStatus[] stats = null; - - try { - stats = fs.listStatus(pt); - } - catch (Exception e){ - raiseValidateError("for MTD file in directory, error reading directory with MTD file " + pt.toString() + ": " + e.getMessage(), conditional); - } - - for(FileStatus stat : stats){ + for(FileStatus stat : MapReduceTool.getDirectoryListing(filename)) { Path childPath = stat.getPath(); // gives directory name - if (childPath.getName().startsWith("part")){ - - BufferedReader br = null; - try { - br = new BufferedReader(new InputStreamReader(fs.open(childPath))); - } - catch(Exception e){ - raiseValidateError("for MTD file in directory, error reading part of MTD file with path " + childPath.toString() + ": " + e.getMessage(), conditional); - } - - JSONObject childObj = null; - try { - childObj = JSONHelper.parse(br); - } - catch(Exception e){ - raiseValidateError("for MTD file in directory, error parsing part of MTD file with path " + childPath.toString() + ": " + e.getMessage(), conditional); - } - - for( Object obj : childObj.entrySet() ){ + if( !childPath.getName().startsWith("part") ) + continue; + BufferedReader br = null; + try { + FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); + br = new BufferedReader(new InputStreamReader(fs.open(childPath))); + JSONObject childObj = JSONHelper.parse(br); + for( Object obj : childObj.entrySet() ){ @SuppressWarnings("unchecked") Entry<Object,Object> e = (Entry<Object, Object>) obj; Object key = e.getKey(); Object val = e.getValue(); retVal.put(key, val); - } + } + } + catch(Exception e){ + raiseValidateError("for MTD file in directory, error parting part of MTD file with path " + childPath.toString() + ": " + e.getMessage(), conditional); } - } // end for + finally { + IOUtilFunctions.closeSilently(br); + } + } } - // CASE: filename points to a file - else if (exists){ - + else if (exists) + { BufferedReader br = null; - - // try reading MTD file try { - br=new BufferedReader(new InputStreamReader(fs.open(pt))); - } catch (Exception e){ - raiseValidateError("error reading MTD file with path " + pt.toString() + ": " + e.getMessage(), conditional); + FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); + br = new BufferedReader(new InputStreamReader(fs.open(new Path(filename)))); + retVal = new JSONObject(br); + } + catch (Exception e){ + raiseValidateError("error parsing MTD file with path " + filename + ": " + e.getMessage(), conditional); } - - // try parsing MTD file - try { - retVal = JSONHelper.parse(br); - } catch (Exception e){ - raiseValidateError("error parsing MTD file with path " + pt.toString() + ": " + e.getMessage(), conditional); + finally { + IOUtilFunctions.closeSilently(br); } } @@ -2045,10 +2000,8 @@ public class DataExpression extends DataIdentifier raiseValidateError("MatrixMarket files must begin with a header line.", conditional); } } - finally - { - if( in != null ) - in.close(); + finally { + IOUtilFunctions.closeSilently(in); } } else { @@ -2069,103 +2022,58 @@ public class DataExpression extends DataIdentifier { // Check the MTD file exists. if there is an MTD file, return false. JSONObject mtdObject = readMetadataFile(mtdFileName, conditional); - - if (mtdObject != null) + if (mtdObject != null) return false; - boolean exists = false; - FileSystem fs = null; - - try { - fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); - } catch (Exception e){ - LOG.error(this.printErrorLocation() + "could not read the configuration file."); - throw new LanguageException(this.printErrorLocation() + "could not read the configuration file.", e); - } - - Path pt = new Path(inputFileName); - try { - if (fs.exists(pt)){ - exists = true; - } - } catch (Exception e){ - LOG.error(this.printErrorLocation() + "file " + inputFileName + " not found"); - throw new LanguageException(this.printErrorLocation() + "file " + inputFileName + " not found"); - } - - try { - // CASE: filename is a directory -- process as a directory - if (exists && fs.getFileStatus(pt).isDirectory()){ - - // currently, only MM files as files are supported. So, if file is directory, then infer - // likely not MM file - return false; - } - // CASE: filename points to a file - else if (exists){ - - //BufferedReader in = new BufferedReader(new FileReader(filename)); - BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(pt))); - + if( MapReduceTool.existsFileOnHDFS(inputFileName) + && !MapReduceTool.isDirectory(inputFileName) ) + { + BufferedReader in = null; + try { + FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); + in = new BufferedReader(new InputStreamReader(fs.open(new Path(inputFileName)))); String headerLine = new String(""); if (in.ready()) headerLine = in.readLine(); - in.close(); - - // check that headerline starts with "%%" - // will infer malformed - if( headerLine !=null && headerLine.startsWith("%%") ) - return true; - else - return false; + return (headerLine !=null && headerLine.startsWith("%%")); } - else { - return false; + catch(Exception ex) { + throw new LanguageException("Failed to read mtd file.", ex); + } + finally { + IOUtilFunctions.closeSilently(in); } - - } catch (Exception e){ - return false; } + + return false; } public boolean checkHasDelimitedFormat(String filename, boolean conditional) throws LanguageException { - - // if the MTD file exists, check the format is not binary + // if the MTD file exists, check the format is not binary JSONObject mtdObject = readMetadataFile(filename + ".mtd", conditional); - if (mtdObject != null){ - String formatTypeString = (String)JSONHelper.get(mtdObject,FORMAT_TYPE); - if (formatTypeString != null ) { - if ( formatTypeString.equalsIgnoreCase(FORMAT_TYPE_VALUE_CSV) ) - return true; - else - return false; - } - } - return false; - - // The file format must be specified either in .mtd file or in read() statement - // Therefore, one need not actually read the data to infer the format. + if (mtdObject != null) { + String formatTypeString = (String)JSONHelper.get(mtdObject,FORMAT_TYPE); + return (formatTypeString != null ) && + formatTypeString.equalsIgnoreCase(FORMAT_TYPE_VALUE_CSV); + } + return false; + // The file format must be specified either in .mtd file or in read() statement + // Therefore, one need not actually read the data to infer the format. } public boolean isCSVReadWithUnknownSize() { - boolean ret = false; - Expression format = getVarParam(FORMAT_TYPE); - if( _opcode == DataOp.READ && format!=null && format.toString().equalsIgnoreCase(FORMAT_TYPE_VALUE_CSV) ) - { + if( _opcode == DataOp.READ && format!=null && format.toString().equalsIgnoreCase(FORMAT_TYPE_VALUE_CSV) ) { Expression rows = getVarParam(READROWPARAM); Expression cols = getVarParam(READCOLPARAM); - if( (rows==null || Long.parseLong(rows.toString())<0) - ||(cols==null || Long.parseLong(cols.toString())<0) ) - { - ret = true; - } + return (rows==null || Long.parseLong(rows.toString())<0) + ||(cols==null || Long.parseLong(cols.toString())<0); } - return ret; + return false; } public boolean isRead() http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java b/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java index f5470de..7a60a80 100644 --- a/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java +++ b/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java @@ -103,11 +103,16 @@ public class CodegenUtils throw new RuntimeException("Failed to compile class "+name); //dynamically load compiled class - URLClassLoader classLoader = new URLClassLoader( + URLClassLoader classLoader = null; + try { + classLoader = new URLClassLoader( new URL[]{new File(_workingDir).toURI().toURL(), runDir}, CodegenUtils.class.getClassLoader()); - ret = classLoader.loadClass("codegen."+name); - classLoader.close(); + ret = classLoader.loadClass("codegen."+name); + } + finally { + IOUtilFunctions.closeSilently(classLoader); + } } catch(Exception ex) { throw new DMLRuntimeException(ex); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java index 3339dca..df50939 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java @@ -101,6 +101,7 @@ import org.apache.sysml.runtime.instructions.cp.DoubleObject; import org.apache.sysml.runtime.instructions.cp.IntObject; import org.apache.sysml.runtime.instructions.cp.StringObject; import org.apache.sysml.runtime.instructions.cp.VariableCPInstruction; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.utils.Statistics; import org.apache.sysml.yarn.ropt.YarnClusterAnalyzer; @@ -1588,10 +1589,8 @@ public class ParForProgramBlock extends ForProgramBlock { throw new DMLRuntimeException("Error writing tasks to taskfile "+fname, ex); } - finally - { - if( br !=null ) - br.close(); + finally { + IOUtilFunctions.closeSilently(br); } return fname; @@ -1620,10 +1619,8 @@ public class ParForProgramBlock extends ForProgramBlock { throw new DMLRuntimeException("Error writing tasks to taskfile "+fname, ex); } - finally - { - if( br !=null ) - br.close(); + finally { + IOUtilFunctions.closeSilently(br); } return fname; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java index 8fcef65..d16d8a5 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerLocal.java @@ -46,6 +46,7 @@ import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.parfor.util.Cell; import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence; import org.apache.sysml.runtime.controlprogram.parfor.util.StagingFileUtils; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.io.MatrixReader; import org.apache.sysml.runtime.matrix.data.IJV; import org.apache.sysml.runtime.matrix.data.InputInfo; @@ -184,10 +185,8 @@ public class DataPartitionerLocal extends DataPartitioner buffer.clear(); } } - finally - { - if( reader != null ) - reader.close(); + finally { + IOUtilFunctions.closeSilently(reader); } } @@ -274,10 +273,8 @@ public class DataPartitionerLocal extends DataPartitioner buffer.clear(); } } - finally - { - if( reader != null ) - reader.close(); + finally { + IOUtilFunctions.closeSilently(reader); } } @@ -359,10 +356,8 @@ public class DataPartitionerLocal extends DataPartitioner appendBlockToStagingArea(fnameStaging, value, row_offset, col_offset, brlen, bclen); } } - finally - { - if( reader != null ) - reader.close(); + finally { + IOUtilFunctions.closeSilently(reader); } } @@ -466,10 +461,8 @@ public class DataPartitionerLocal extends DataPartitioner buffer.clear(); } } - finally - { - if( reader != null ) - reader.close(); + finally { + IOUtilFunctions.closeSilently(reader); } } @@ -635,10 +628,8 @@ public class DataPartitionerLocal extends DataPartitioner } } } - finally - { - if( writer != null ) - writer.close(); + finally { + IOUtilFunctions.closeSilently(writer); } } @@ -668,10 +659,8 @@ public class DataPartitionerLocal extends DataPartitioner } } } - finally - { - if( writer != null ) - writer.close(); + finally { + IOUtilFunctions.closeSilently(writer); } } @@ -704,10 +693,8 @@ public class DataPartitionerLocal extends DataPartitioner } } } - finally - { - if( out != null ) - out.close(); + finally { + IOUtilFunctions.closeSilently(out); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteReducer.java index 8a9ddce..d493e90 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteReducer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteReducer.java @@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock; import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixCell; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; @@ -151,10 +152,8 @@ public class DataPartitionerRemoteReducer _sb.setLength(0); } } - finally - { - if( writer != null ) - writer.close(); + finally { + IOUtilFunctions.closeSilently(writer); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java index 39af0e5..293150e 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForMR.java @@ -51,6 +51,7 @@ import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat; import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock; import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell; import org.apache.sysml.runtime.instructions.cp.Data; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.io.MatrixReader; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.OutputInfo; @@ -267,10 +268,8 @@ public class RemoteDPParForMR countAll++; } } - finally - { - if( reader != null ) - reader.close(); + finally { + IOUtilFunctions.closeSilently(reader); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedFileSplit.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedFileSplit.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedFileSplit.java index 0b01569..e4895cb 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedFileSplit.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForColocatedFileSplit.java @@ -39,6 +39,7 @@ import org.apache.hadoop.mapred.lib.NLineInputFormat; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.runtime.controlprogram.parfor.Task.TaskType; import org.apache.sysml.runtime.instructions.cp.IntObject; +import org.apache.sysml.runtime.io.IOUtilFunctions; /** @@ -85,9 +86,14 @@ public class RemoteParForColocatedFileSplit extends FileSplit //read task string LongWritable key = new LongWritable(); Text value = new Text(); - RecordReader<LongWritable,Text> reader = new NLineInputFormat().getRecordReader(this, job, Reporter.NULL); - reader.next(key, value); - reader.close(); + RecordReader<LongWritable,Text> reader = null; + try { + reader = new NLineInputFormat().getRecordReader(this, job, Reporter.NULL); + reader.next(key, value); + } + finally { + IOUtilFunctions.closeSilently(reader); + } //parse task Task t = Task.parseCompactString( value.toString() ); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java index 7c9332d..465136d 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForMR.java @@ -49,6 +49,7 @@ import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.controlprogram.parfor.stat.Stat; import org.apache.sysml.runtime.instructions.cp.Data; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.io.MatrixReader; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; @@ -278,10 +279,8 @@ public class RemoteParForMR countAll++; } } - finally - { - if( reader != null ) - reader.close(); + finally { + IOUtilFunctions.closeSilently(reader); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java index 7e50f0b..8e201d0 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalFile.java @@ -49,6 +49,7 @@ import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.parfor.util.Cell; import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence; import org.apache.sysml.runtime.controlprogram.parfor.util.StagingFileUtils; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.io.MatrixReader; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.MatrixFormatMetaData; @@ -254,18 +255,14 @@ public class ResultMergeLocalFile extends ResultMerge out.write( valueStr+"\n" ); } } - finally - { - if( reader != null ) - reader.close(); + finally { + IOUtilFunctions.closeSilently(reader); } } } } - finally - { - if( out != null ) - out.close(); + finally { + IOUtilFunctions.closeSilently(out); } } catch(Exception ex) @@ -353,18 +350,14 @@ public class ResultMergeLocalFile extends ResultMerge out.append(key, value); } } - finally - { - if( reader != null ) - reader.close(); + finally { + IOUtilFunctions.closeSilently(reader); } } } } - finally - { - if( out != null ) - out.close(); + finally { + IOUtilFunctions.closeSilently(out); } } catch(Exception ex) @@ -500,10 +493,8 @@ public class ResultMergeLocalFile extends ResultMerge } } } - finally - { - if( reader != null ) - reader.close(); + finally { + IOUtilFunctions.closeSilently(reader); } } } @@ -563,10 +554,8 @@ public class ResultMergeLocalFile extends ResultMerge buffer.clear(); } } - finally - { - if( reader != null ) - reader.close(); + finally { + IOUtilFunctions.closeSilently(reader); } } } @@ -611,10 +600,8 @@ public class ResultMergeLocalFile extends ResultMerge buffer.clear(); } } - finally - { - if( reader != null ) - reader.close(); + finally { + IOUtilFunctions.closeSilently(reader); } } } @@ -749,10 +736,8 @@ public class ResultMergeLocalFile extends ResultMerge writer.append(indexes, mb); } } - finally - { - if( writer != null ) - writer.close(); + finally { + IOUtilFunctions.closeSilently(writer); } } @@ -887,10 +872,8 @@ public class ResultMergeLocalFile extends ResultMerge if( !written ) out.write("1 1 0\n"); } - finally - { - if( out != null ) - out.close(); + finally { + IOUtilFunctions.closeSilently(out); } } @@ -1017,10 +1000,8 @@ public class ResultMergeLocalFile extends ResultMerge if( !written ) out.append(indexes,cell); } - finally - { - if( out != null ) - out.close(); + finally { + IOUtilFunctions.closeSilently(out); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/util/StagingFileUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/util/StagingFileUtils.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/util/StagingFileUtils.java index 391c75f..81dc9a0 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/util/StagingFileUtils.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/util/StagingFileUtils.java @@ -31,6 +31,7 @@ import java.util.HashMap; import java.util.LinkedList; import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.util.FastStringTokenizer; @@ -61,10 +62,8 @@ public class StagingFileUtils sb.setLength(0); } } - finally - { - if( out != null ) - out.close(); + finally { + IOUtilFunctions.closeSilently(out); } } @@ -88,10 +87,8 @@ public class StagingFileUtils sb.setLength(0); } } - finally - { - if( out != null ) - out.close(); + finally { + IOUtilFunctions.closeSilently(out); } } @@ -151,13 +148,6 @@ public class StagingFileUtils return len; } - - public static void closeKeyMap( BufferedReader in ) - throws IOException - { - if( in != null ) - in.close(); - } public static LinkedList<Cell> readCellListFromLocal( String fname ) throws IOException @@ -179,10 +169,8 @@ public class StagingFileUtils buffer.addLast( c ); } } - finally - { - if( in != null ) - in.close(); + finally { + IOUtilFunctions.closeSilently(in); } return buffer; @@ -232,10 +220,8 @@ public class StagingFileUtils tmp.recomputeNonZeros(); } } - finally - { - if( in != null ) - in.close(); + finally { + IOUtilFunctions.closeSilently(in); } //finally change internal representation if required http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java index 45cc472..715bf4f 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cpfile/ParameterizedBuiltinCPFileInstruction.java @@ -56,6 +56,7 @@ import org.apache.sysml.runtime.functionobjects.ValueFunction; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.cp.ParameterizedBuiltinCPInstruction; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.io.MatrixReader; import org.apache.sysml.runtime.io.MatrixWriter; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; @@ -307,10 +308,8 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC buffer.clear(); } } - finally - { - if( reader != null ) - reader.close(); + finally { + IOUtilFunctions.closeSilently(reader); } } } @@ -357,10 +356,8 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC buffer.clear(); } } - finally - { - if( reader != null ) - reader.close(); + finally { + IOUtilFunctions.closeSilently(reader); } } } @@ -406,10 +403,8 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC } } } - finally - { - if( reader != null ) - reader.close(); + finally { + IOUtilFunctions.closeSilently(reader); } } @@ -724,12 +719,9 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC //Note: no need to handle empty result } - finally - { - if( twriter != null ) - twriter.close(); - if( bwriter != null ) - bwriter.close(); + finally { + IOUtilFunctions.closeSilently(twriter); + IOUtilFunctions.closeSilently(bwriter); } } @@ -811,8 +803,7 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC blockRowOut++; } - if( fkeyMap != null ) - StagingFileUtils.closeKeyMap(fkeyMap); + IOUtilFunctions.closeSilently(fkeyMap); } } else @@ -874,18 +865,14 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC writer.append(key, block); blockColOut++; } - - if( fkeyMap != null ) - StagingFileUtils.closeKeyMap(fkeyMap); + IOUtilFunctions.closeSilently(fkeyMap); } } //Note: no handling of empty matrices necessary } - finally - { - if( writer != null ) - writer.close(); + finally { + IOUtilFunctions.closeSilently(writer); } } @@ -962,9 +949,7 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC } } } - if( fkeyMap != null ) - StagingFileUtils.closeKeyMap(fkeyMap); - + IOUtilFunctions.closeSilently(fkeyMap); } else //cols { @@ -1022,8 +1007,7 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC } } } - if( fkeyMap != null ) - StagingFileUtils.closeKeyMap(fkeyMap); + IOUtilFunctions.closeSilently(fkeyMap); } //write remaining empty blocks @@ -1047,10 +1031,8 @@ public class ParameterizedBuiltinCPFileInstruction extends ParameterizedBuiltinC if( countBlk1 != countBlk2 ) throw new DMLRuntimeException("Wrong number of written result blocks: "+countBlk1+" vs "+countBlk2+"."); } - finally - { - if( writer != null ) - writer.close(); + finally { + IOUtilFunctions.closeSilently(writer); } } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaKernels.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaKernels.java b/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaKernels.java index f24c320..d858b0b 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaKernels.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/gpu/context/JCudaKernels.java @@ -34,6 +34,7 @@ import java.util.HashMap; import jcuda.runtime.JCuda; import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.io.IOUtilFunctions; import jcuda.CudaException; import jcuda.Pointer; @@ -207,22 +208,8 @@ public class JCudaKernels { throw new DMLRuntimeException("Could not initialize the kernels", e); } finally { - if (out != null) { - try { - out.close(); - } - catch (IOException e) { - throw new DMLRuntimeException("Could not initialize the kernels", e); - } - } - if (in != null) { - try { - in.close(); - } - catch (IOException e) { - throw new DMLRuntimeException("Could not initialize the kernels", e); - } - } - } + IOUtilFunctions.closeSilently(out); + IOUtilFunctions.closeSilently(in); + } } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java index fe1813f..b2f3503 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java @@ -30,7 +30,6 @@ import java.util.stream.LongStream; import org.apache.commons.math3.distribution.PoissonDistribution; import org.apache.commons.math3.random.Well1024a; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaPairRDD; @@ -387,12 +386,11 @@ public class RandSPInstruction extends UnarySPInstruction else { String path = LibMatrixDatagen.generateUniqueSeedPath(dir); - + PrintWriter pw = null; try { FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); - FSDataOutputStream fsOut = fs.create(new Path(path)); - PrintWriter pw = new PrintWriter(fsOut); + pw = new PrintWriter(fs.create(new Path(path))); StringBuilder sb = new StringBuilder(); for( long i=0; i<numBlocks; i++ ) { sb.append(1 + i/numColBlocks); @@ -405,12 +403,13 @@ public class RandSPInstruction extends UnarySPInstruction pw.println(sb.toString()); sb.setLength(0); } - pw.close(); - fsOut.close(); } catch( IOException ex ) { throw new DMLRuntimeException(ex); } + finally { + IOUtilFunctions.closeSilently(pw); + } //for load balancing: degree of parallelism such that ~128MB per partition int numPartitions = (int) Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1); @@ -479,21 +478,22 @@ public class RandSPInstruction extends UnarySPInstruction { String path = LibMatrixDatagen.generateUniqueSeedPath(dir); + PrintWriter pw = null; try { FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); - FSDataOutputStream fsOut = fs.create(new Path(path)); - PrintWriter pw = new PrintWriter(fsOut); + pw = new PrintWriter(fs.create(new Path(path))); for( long i=0; i<numBlocks; i++ ) { double off = seq_from + seq_incr*i*rowsInBlock; pw.println(off); } - pw.close(); - fsOut.close(); } catch( IOException ex ) { throw new DMLRuntimeException(ex); } + finally { + IOUtilFunctions.closeSilently(pw); + } //for load balancing: degree of parallelism such that ~128MB per partition int numPartitions = (int) Math.max(Math.min(totalSize/hdfsBlkSize, numBlocks), 1); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/io/BinaryBlockSerialization.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/BinaryBlockSerialization.java b/src/main/java/org/apache/sysml/runtime/io/BinaryBlockSerialization.java index 074bb8d..c1361b8 100644 --- a/src/main/java/org/apache/sysml/runtime/io/BinaryBlockSerialization.java +++ b/src/main/java/org/apache/sysml/runtime/io/BinaryBlockSerialization.java @@ -99,8 +99,7 @@ public class BinaryBlockSerialization implements Serialization public void close() throws IOException { - if( _in != null ) - _in.close(); + IOUtilFunctions.closeSilently(_in); } } @@ -128,9 +127,8 @@ public class BinaryBlockSerialization implements Serialization @Override public void close() throws IOException - { - if( _out != null ) - _out.close(); + { + IOUtilFunctions.closeSilently(_out); } } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java index bc9f08f..8f5e0df 100644 --- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java @@ -339,12 +339,16 @@ public class IOUtilFunctions public static String toString(InputStream input) throws IOException { if( input == null ) return null; - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - byte[] buff = new byte[LocalFileUtils.BUFFER_SIZE]; - for( int len=0; (len=input.read(buff))!=-1; ) - bos.write(buff, 0, len); - input.close(); - return bos.toString("UTF-8"); + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + byte[] buff = new byte[LocalFileUtils.BUFFER_SIZE]; + for( int len=0; (len=input.read(buff))!=-1; ) + bos.write(buff, 0, len); + return bos.toString("UTF-8"); + } + finally { + IOUtilFunctions.closeSilently(input); + } } public static InputSplit[] sortInputSplits(InputSplit[] splits) { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java index d5fd624..15d4858 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java @@ -450,8 +450,7 @@ public class ReaderTextCSVParallel extends MatrixReader } } finally { - if (reader != null) - reader.close(); + IOUtilFunctions.closeSilently(reader); } } catch (Exception ex) { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java index fed0eeb..cdc49fb 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java @@ -78,15 +78,20 @@ public class WriterBinaryBlock extends MatrixWriter Path path = new Path( fname ); FileSystem fs = FileSystem.get(job); - SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path, - MatrixIndexes.class, MatrixBlock.class); + SequenceFile.Writer writer = null; + try { + writer = new SequenceFile.Writer(fs, job, path, + MatrixIndexes.class, MatrixBlock.class); + + MatrixIndexes index = new MatrixIndexes(1, 1); + MatrixBlock block = new MatrixBlock((int)Math.min(rlen, brlen), + (int)Math.min(clen, bclen), true); + writer.append(index, block); + } + finally { + IOUtilFunctions.closeSilently(writer); + } - MatrixIndexes index = new MatrixIndexes(1, 1); - MatrixBlock block = new MatrixBlock((int)Math.min(rlen, brlen), - (int)Math.min(clen, bclen), true); - writer.append(index, block); - writer.close(); - IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java index e1943ed..c451f39 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java @@ -64,14 +64,19 @@ public class WriterBinaryCell extends MatrixWriter Path path = new Path( fname ); FileSystem fs = FileSystem.get(job); - SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path, + SequenceFile.Writer writer = null; + try { + writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixCell.class); - MatrixIndexes index = new MatrixIndexes(1, 1); - MatrixCell cell = new MatrixCell(0); - writer.append(index, cell); - writer.close(); - + MatrixIndexes index = new MatrixIndexes(1, 1); + MatrixCell cell = new MatrixCell(0); + writer.append(index, cell); + } + finally { + IOUtilFunctions.closeSilently(writer); + } + IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java index 9a5cbdb..9838d01 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java @@ -71,10 +71,14 @@ public class WriterMatrixMarket extends MatrixWriter Path path = new Path( fname ); FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); - FSDataOutputStream writer = fs.create(path); - writer.writeBytes("1 1 0"); - writer.close(); - + FSDataOutputStream writer = null; + try { + writer = fs.create(path); + writer.writeBytes("1 1 0"); + } + finally { + IOUtilFunctions.closeSilently(writer); + } IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java index 328a2ed..9389bdc 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java @@ -66,10 +66,15 @@ public class WriterTextCell extends MatrixWriter Path path = new Path( fname ); FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); - FSDataOutputStream writer = fs.create(path); - writer.writeBytes("1 1 0"); - writer.close(); - + FSDataOutputStream writer = null; + try{ + writer = fs.create(path); + writer.writeBytes("1 1 0"); + } + finally{ + IOUtilFunctions.closeSilently(writer); + } + IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/matrix/CleanupMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/CleanupMR.java b/src/main/java/org/apache/sysml/runtime/matrix/CleanupMR.java index cdc9e49..3dee28d 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/CleanupMR.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/CleanupMR.java @@ -44,6 +44,7 @@ import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.conf.DMLConfig; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; import org.apache.sysml.runtime.util.LocalFileUtils; @@ -122,10 +123,8 @@ public class CleanupMR { throw new DMLRuntimeException("Error writing cleanup tasks to taskfile "+path.toString(), ex); } - finally - { - if( br != null ) - br.close(); + finally { + IOUtilFunctions.closeSilently(br); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java b/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java index 00ae9d3..2aecec2 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/DataGenMR.java @@ -28,7 +28,6 @@ import java.util.stream.LongStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.math3.random.Well1024a; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; @@ -48,6 +47,7 @@ import org.apache.sysml.runtime.instructions.mr.MRInstruction; import org.apache.sysml.runtime.instructions.mr.MRInstruction.MRINSTRUCTION_TYPE; import org.apache.sysml.runtime.instructions.mr.RandInstruction; import org.apache.sysml.runtime.instructions.mr.SeqInstruction; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.LibMatrixDatagen; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; @@ -146,40 +146,43 @@ public class DataGenMR inputs[i]=LibMatrixDatagen.generateUniqueSeedPath(genInst.getBaseDir()); maxsparsity = Math.max(maxsparsity, randInst.getSparsity()); - FSDataOutputStream fsOut = fs.create(new Path(inputs[i])); - PrintWriter pw = new PrintWriter(fsOut); - - //for obj reuse and preventing repeated buffer re-allocations - StringBuilder sb = new StringBuilder(); - - //seed generation - Well1024a bigrand = LibMatrixDatagen.setupSeedsForRand(randInst.getSeed()); - LongStream nnz = LibMatrixDatagen.computeNNZperBlock(rlens[i], clens[i], brlens[i], bclens[i], randInst.getSparsity()); - PrimitiveIterator.OfLong nnzIter = nnz.iterator(); - for(long r = 0; r < rlens[i]; r += brlens[i]) { - long curBlockRowSize = Math.min(brlens[i], (rlens[i] - r)); - for(long c = 0; c < clens[i]; c += bclens[i]) - { - long curBlockColSize = Math.min(bclens[i], (clens[i] - c)); - - sb.append((r / brlens[i]) + 1); - sb.append(','); - sb.append((c / bclens[i]) + 1); - sb.append(','); - sb.append(curBlockRowSize); - sb.append(','); - sb.append(curBlockColSize); - sb.append(','); - sb.append(nnzIter.nextLong()); - sb.append(','); - sb.append(bigrand.nextLong()); - pw.println(sb.toString()); - sb.setLength(0); - numblocks++; + PrintWriter pw = null; + try { + pw = new PrintWriter(fs.create(new Path(inputs[i]))); + + //for obj reuse and preventing repeated buffer re-allocations + StringBuilder sb = new StringBuilder(); + + //seed generation + Well1024a bigrand = LibMatrixDatagen.setupSeedsForRand(randInst.getSeed()); + LongStream nnz = LibMatrixDatagen.computeNNZperBlock(rlens[i], clens[i], brlens[i], bclens[i], randInst.getSparsity()); + PrimitiveIterator.OfLong nnzIter = nnz.iterator(); + for(long r = 0; r < rlens[i]; r += brlens[i]) { + long curBlockRowSize = Math.min(brlens[i], (rlens[i] - r)); + for(long c = 0; c < clens[i]; c += bclens[i]) + { + long curBlockColSize = Math.min(bclens[i], (clens[i] - c)); + + sb.append((r / brlens[i]) + 1); + sb.append(','); + sb.append((c / bclens[i]) + 1); + sb.append(','); + sb.append(curBlockRowSize); + sb.append(','); + sb.append(curBlockColSize); + sb.append(','); + sb.append(nnzIter.nextLong()); + sb.append(','); + sb.append(bigrand.nextLong()); + pw.println(sb.toString()); + sb.setLength(0); + numblocks++; + } } } - pw.close(); - fsOut.close(); + finally { + IOUtilFunctions.closeSilently(pw); + } inputInfos[i] = InputInfo.TextCellInputInfo; } else if ( mrtype == MRINSTRUCTION_TYPE.Seq ) { @@ -217,46 +220,41 @@ public class DataGenMR else clens[i] = 1; - FSDataOutputStream fsOut = fs.create(new Path(inputs[i])); - PrintWriter pw = new PrintWriter(fsOut); - StringBuilder sb = new StringBuilder(); - - double temp = from; - double block_from, block_to; - for(long r = 0; r < rlens[i]; r += brlens[i]) { - long curBlockRowSize = Math.min(brlens[i], (rlens[i] - r)); + PrintWriter pw = null; + try { + pw = new PrintWriter(fs.create(new Path(inputs[i]))); + StringBuilder sb = new StringBuilder(); - // block (bid_i,bid_j) generates a sequence from the interval [block_from, block_to] (inclusive of both end points of the interval) - long bid_i = ((r / brlens[i]) + 1); - long bid_j = 1; - block_from = temp; - block_to = temp+(curBlockRowSize-1)*incr; - temp = block_to + incr; // next block starts from here - - sb.append(bid_i); - sb.append(','); - sb.append(bid_j); - sb.append(','); - /* - // Need not include block size while generating seq() - sb.append(curBlockRowSize); - sb.append(','); - sb.append(1); - sb.append(',');*/ - sb.append(block_from); - sb.append(','); - sb.append(block_to); - sb.append(','); - sb.append(incr); - - pw.println(sb.toString()); - //System.out.println("MapTask " + r + ": " + sb.toString()); - sb.setLength(0); - numblocks++; + double temp = from; + double block_from, block_to; + for(long r = 0; r < rlens[i]; r += brlens[i]) { + long curBlockRowSize = Math.min(brlens[i], (rlens[i] - r)); + + // block (bid_i,bid_j) generates a sequence from the interval [block_from, block_to] (inclusive of both end points of the interval) + long bid_i = ((r / brlens[i]) + 1); + long bid_j = 1; + block_from = temp; + block_to = temp+(curBlockRowSize-1)*incr; + temp = block_to + incr; // next block starts from here + + sb.append(bid_i); + sb.append(','); + sb.append(bid_j); + sb.append(','); + sb.append(block_from); + sb.append(','); + sb.append(block_to); + sb.append(','); + sb.append(incr); + + pw.println(sb.toString()); + sb.setLength(0); + numblocks++; + } + } + finally { + IOUtilFunctions.closeSilently(pw); } - - pw.close(); - fsOut.close(); inputInfos[i] = InputInfo.TextCellInputInfo; } else { throw new DMLRuntimeException("Unexpected Data Generation Instruction Type: " + mrtype ); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java b/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java index 4c7d257..4bbc6a3 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java @@ -117,16 +117,15 @@ public class SortMR private ArrayList<WritableComparable> readPartitions(FileSystem fs, Path p, JobConf job) throws IOException { - SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job); ArrayList<WritableComparable> parts = new ArrayList<WritableComparable>(); + SequenceFile.Reader reader = null; try { - //WritableComparable key = keyClass.newInstance(); - DoubleWritable key = new DoubleWritable(); + reader = new SequenceFile.Reader(fs, p, job); + DoubleWritable key = new DoubleWritable(); NullWritable value = NullWritable.get(); while (reader.next(key, value)) { parts.add(key); - //key=keyClass.newInstance(); key = new DoubleWritable(); } } @@ -137,7 +136,6 @@ public class SortMR IOUtilFunctions.closeSilently(reader); } - reader.close(); return parts; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index 005f24e..2d6e2b2 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -62,6 +62,7 @@ import org.apache.sysml.runtime.functionobjects.SwapIndex; import org.apache.sysml.runtime.instructions.cp.CM_COV_Object; import org.apache.sysml.runtime.instructions.cp.KahanObject; import org.apache.sysml.runtime.instructions.cp.ScalarObject; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.LibMatrixBincell.BinaryAccessType; import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue; @@ -1824,9 +1825,14 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab { //workaround because sequencefile.reader.next(key, value) does not yet support serialization framework DataInputBuffer din = (DataInputBuffer)in; - MatrixBlockDataInput mbin = new FastBufferedDataInputStream(din); - nonZeros = mbin.readDoubleArray(limit, denseBlock); - ((FastBufferedDataInputStream)mbin).close(); + FastBufferedDataInputStream mbin = null; + try { + mbin = new FastBufferedDataInputStream(din); + nonZeros = mbin.readDoubleArray(limit, denseBlock); + } + finally { + IOUtilFunctions.closeSilently(mbin); + } } else //default deserialize { @@ -1854,9 +1860,14 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab { //workaround because sequencefile.reader.next(key, value) does not yet support serialization framework DataInputBuffer din = (DataInputBuffer)in; - MatrixBlockDataInput mbin = new FastBufferedDataInputStream(din); - nonZeros = mbin.readSparseRows(rlen, sparseBlock); - ((FastBufferedDataInputStream)mbin).close(); + FastBufferedDataInputStream mbin = null; + try { + mbin = new FastBufferedDataInputStream(din); + nonZeros = mbin.readSparseRows(rlen, sparseBlock); + } + finally { + IOUtilFunctions.closeSilently(mbin); + } } else //default deserialize { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/matrix/data/UnPaddedOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/UnPaddedOutputFormat.java b/src/main/java/org/apache/sysml/runtime/matrix/data/UnPaddedOutputFormat.java index 3c4e59a..83575f0 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/UnPaddedOutputFormat.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/UnPaddedOutputFormat.java @@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Progressable; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.mapred.MRConfigurationNames; public class UnPaddedOutputFormat<K extends Writable, V extends Writable> extends FileOutputFormat<K, V> @@ -46,8 +47,7 @@ public class UnPaddedOutputFormat<K extends Writable, V extends Writable> extend } @Override public void close(Reporter report) throws IOException { - out.close(); - + IOUtilFunctions.closeSilently(out); } @Override public void write(K key, V value) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVReblockMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVReblockMapper.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVReblockMapper.java index f371b70..8fcc725 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVReblockMapper.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/CSVReblockMapper.java @@ -162,12 +162,17 @@ public class CSVReblockMapper extends MapperBase implements Mapper<LongWritable, ByteWritable key=new ByteWritable(); OffsetCount value=new OffsetCount(); Path p=new Path(job.get(CSVReblockMR.ROWID_FILE_NAME)); - SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job); - while (reader.next(key, value)) { - if(key.get()==matrixIndex && filename.equals(value.filename)) - offsetMap.put(value.fileOffset, value.count); + SequenceFile.Reader reader = null; + try { + reader = new SequenceFile.Reader(fs, p, job); + while (reader.next(key, value)) { + if(key.get()==matrixIndex && filename.equals(value.filename)) + offsetMap.put(value.fileOffset, value.count); + } + } + finally { + IOUtilFunctions.closeSilently(reader); } - reader.close(); } catch (IOException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/matrix/mapred/CollectMultipleConvertedOutputs.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/mapred/CollectMultipleConvertedOutputs.java b/src/main/java/org/apache/sysml/runtime/matrix/mapred/CollectMultipleConvertedOutputs.java index 0934c36..6079677 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/mapred/CollectMultipleConvertedOutputs.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/mapred/CollectMultipleConvertedOutputs.java @@ -71,7 +71,7 @@ public class CollectMultipleConvertedOutputs public void close() throws IOException - { + { multipleOutputs.close(); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/matrix/sort/CompactInputFormat.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/CompactInputFormat.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/CompactInputFormat.java index 8176cb8..a0faa15 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/sort/CompactInputFormat.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/CompactInputFormat.java @@ -34,6 +34,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.sysml.runtime.io.IOUtilFunctions; @SuppressWarnings("rawtypes") public class CompactInputFormat<K extends WritableComparable, V extends Writable> extends FileInputFormat<K, V> @@ -86,7 +87,7 @@ public class CompactInputFormat<K extends WritableComparable, V extends Writable @Override public void close() throws IOException { - currentStream.close(); + IOUtilFunctions.closeSilently(currentStream); } @SuppressWarnings("unchecked") public K createKey() { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/matrix/sort/CompactOutputFormat.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/CompactOutputFormat.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/CompactOutputFormat.java index 6b03656..cd730cb 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/sort/CompactOutputFormat.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/CompactOutputFormat.java @@ -32,6 +32,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Progressable; +import org.apache.sysml.runtime.io.IOUtilFunctions; public class CompactOutputFormat<K extends Writable, V extends Writable> extends FileOutputFormat<K,V> { @@ -74,7 +75,7 @@ public class CompactOutputFormat<K extends Writable, V extends Writable> extends if (finalSync) { ((FSDataOutputStream) out).sync(); } - out.close(); + IOUtilFunctions.closeSilently(out); } @Override http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java index 5db07b3..3559593 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java @@ -40,7 +40,7 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; - +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.data.MatrixCell; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.NumItemsByEachReducerMetaData; @@ -342,7 +342,7 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M @Override public void close() throws IOException { - currentStream.close(); + IOUtilFunctions.closeSilently(currentStream); } @Override @@ -449,7 +449,7 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M @Override public void close() throws IOException { - currentStream.close(); + IOUtilFunctions.closeSilently(currentStream); } @Override http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java index 7933020..3f6cd05 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/SamplingSortMRInputFormat.java @@ -38,7 +38,7 @@ import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileRecordReader; import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.QuickSort; - +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.data.Converter; import org.apache.sysml.runtime.matrix.data.MatrixCell; import org.apache.sysml.runtime.matrix.data.Pair; @@ -149,23 +149,28 @@ extends SequenceFileInputFormat<K,V> } //note: key value always double/null as expected by partitioner - SequenceFile.Writer writer = - SequenceFile.createWriter(outFs, conf, partFile, DoubleWritable.class, NullWritable.class); - NullWritable nullValue = NullWritable.get(); - int index0=-1, i=0; - boolean lessthan0=true; - for(WritableComparable splitValue : sampler.createPartitions(partitions)) - { - writer.append(splitValue, nullValue); - if(lessthan0 && ((DoubleWritable)splitValue).get()>=0) { - index0=i; - lessthan0=false; - } - i++; + SequenceFile.Writer writer = null; + int index0 = -1; + try { + writer = SequenceFile.createWriter(outFs, conf, partFile, DoubleWritable.class, NullWritable.class); + NullWritable nullValue = NullWritable.get(); + int i = 0; + boolean lessthan0=true; + for(WritableComparable splitValue : sampler.createPartitions(partitions)) + { + writer.append(splitValue, nullValue); + if(lessthan0 && ((DoubleWritable)splitValue).get()>=0) { + index0=i; + lessthan0=false; + } + i++; + } + if(lessthan0) + index0=partitions-1; + } + finally { + IOUtilFunctions.closeSilently(writer); } - if(lessthan0) - index0=partitions-1; - writer.close(); return index0; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java index 580cc87..6d9cb1f 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java +++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfBBMapper.java @@ -36,6 +36,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.wink.json4j.JSONException; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.CSVReblockMR; import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount; import org.apache.sysml.runtime.matrix.data.TaggedFirstSecondIndexes; @@ -75,14 +76,19 @@ public class ApplyTfBBMapper extends MapperBase implements Mapper<LongWritable, Path thisPath=new Path(job.get(MRConfigurationNames.MR_MAP_INPUT_FILE)).makeQualified(fs); String thisfile=thisPath.toString(); - SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job); - while (reader.next(key, value)) { - // "key" needn't be checked since the offset file has information about a single CSV input (the raw data file) - if(thisfile.equals(value.filename)) - offsetMap.put(value.fileOffset, value.count); + SequenceFile.Reader reader = null; + try { + reader = new SequenceFile.Reader(fs, p, job); + while (reader.next(key, value)) { + // "key" needn't be checked since the offset file has information about a single CSV input (the raw data file) + if(thisfile.equals(value.filename)) + offsetMap.put(value.fileOffset, value.count); + } } - reader.close(); - + finally { + IOUtilFunctions.closeSilently(reader); + } + idxRow = new CSVReblockMapper.IndexedBlockRow(); int maxBclen=0; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java index 7fb1ccc..db3be0f 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java +++ b/src/main/java/org/apache/sysml/runtime/transform/ApplyTfCSVMapper.java @@ -34,6 +34,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.wink.json4j.JSONException; import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.io.IOUtilFunctions; public class ApplyTfCSVMapper implements Mapper<LongWritable, Text, NullWritable, Text> { @@ -105,8 +106,7 @@ public class ApplyTfCSVMapper implements Mapper<LongWritable, Text, NullWritable @Override public void close() throws IOException { - if ( br != null ) - br.close(); + IOUtilFunctions.closeSilently(br); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java index 8bfa572..8878ff0 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/BinAgent.java @@ -40,6 +40,7 @@ import org.apache.wink.json4j.JSONArray; import org.apache.wink.json4j.JSONException; import org.apache.wink.json4j.JSONObject; import org.apache.sysml.lops.Lop; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.Pair; @@ -188,9 +189,14 @@ public class BinAgent extends Encoder private void writeTfMtd(int colID, String min, String max, String binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws IOException { Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + TfUtils.TXMTD_BIN_FILE_SUFFIX); - BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); - br.write(colID + TfUtils.TXMTD_SEP + min + TfUtils.TXMTD_SEP + max + TfUtils.TXMTD_SEP + binwidth + TfUtils.TXMTD_SEP + nbins + "\n"); - br.close(); + BufferedWriter br = null; + try { + br = new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); + br.write(colID + TfUtils.TXMTD_SEP + min + TfUtils.TXMTD_SEP + max + TfUtils.TXMTD_SEP + binwidth + TfUtils.TXMTD_SEP + nbins + "\n"); + } + finally { + IOUtilFunctions.closeSilently(br); + } } /** @@ -273,23 +279,26 @@ public class BinAgent extends Encoder Path path = new Path( txMtdDir + "/Bin/" + agents.getName(colID) + TfUtils.TXMTD_BIN_FILE_SUFFIX); TfUtils.checkValidInputFile(fs, path, true); - BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path))); - // format: colID,min,max,nbins - String[] fields = br.readLine().split(TfUtils.TXMTD_SEP); - double min = UtilFunctions.parseToDouble(fields[1]); - //double max = UtilFunctions.parseToDouble(fields[2]); - double binwidth = UtilFunctions.parseToDouble(fields[3]); - int nbins = UtilFunctions.parseToInt(fields[4]); - - _numBins[i] = nbins; - _min[i] = min; - _binWidths[i] = binwidth; // (max-min)/nbins; - - br.close(); + BufferedReader br = null; + try { + br = new BufferedReader(new InputStreamReader(fs.open(path))); + // format: colID,min,max,nbins + String[] fields = br.readLine().split(TfUtils.TXMTD_SEP); + double min = UtilFunctions.parseToDouble(fields[1]); + //double max = UtilFunctions.parseToDouble(fields[2]); + double binwidth = UtilFunctions.parseToDouble(fields[3]); + int nbins = UtilFunctions.parseToInt(fields[4]); + + _numBins[i] = nbins; + _min[i] = min; + _binWidths[i] = binwidth; // (max-min)/nbins; + } + finally { + IOUtilFunctions.closeSilently(br); + } } } else { - fs.close(); throw new RuntimeException("Path to recode maps must be a directory: " + txMtdDir); } }
