http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java index 6559309..722c6a9 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java +++ b/src/main/java/org/apache/sysml/runtime/transform/DataTransform.java @@ -68,6 +68,7 @@ import org.apache.sysml.runtime.instructions.mr.CSVReblockInstruction; import org.apache.sysml.runtime.instructions.spark.ParameterizedBuiltinSPInstruction; import org.apache.sysml.runtime.instructions.spark.data.RDDObject; import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.CSVReblockMR; import org.apache.sysml.runtime.matrix.CSVReblockMR.AssignRowIDMRReturn; import org.apache.sysml.runtime.matrix.JobReturn; @@ -98,10 +99,9 @@ public class DataTransform */ private static String readHeaderLine(FileSystem fs, CSVFileFormatProperties prop, String smallestFile) throws IOException { String line = null; - - BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(smallestFile)))); - line = br.readLine(); - br.close(); + try( BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(smallestFile)))) ) { + line = br.readLine(); + } if(prop.hasHeader()) { ; // nothing here } @@ -660,9 +660,10 @@ public class DataTransform String[] columnNames = Pattern.compile(Pattern.quote(delim)).split(header, -1); int ret = columnNames.length; - BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(tfMtdPath + "/spec.json")))); - JSONObject spec = JSONHelper.parse(br); - br.close(); + JSONObject spec = null; + try(BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(tfMtdPath + "/spec.json"))))) { + spec = JSONHelper.parse(br); + } // fetch relevant attribute lists if ( !spec.containsKey(TfUtils.TXMETHOD_DUMMYCODE) ) @@ -680,16 +681,18 @@ public class DataTransform if ( TfUtils.checkValidInputFile(fs, binpath, false ) ) { - br = new BufferedReader(new InputStreamReader(fs.open(binpath))); - int nbins = UtilFunctions.parseToInt(br.readLine().split(TfUtils.TXMTD_SEP)[4]); - br.close(); + int nbins = -1; + try( BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(binpath)))) { + nbins = UtilFunctions.parseToInt(br.readLine().split(TfUtils.TXMTD_SEP)[4]); + } ret += (nbins-1); } else if ( TfUtils.checkValidInputFile(fs, rcdpath, false ) ) { - br = new BufferedReader(new InputStreamReader(fs.open(rcdpath))); - int ndistinct = UtilFunctions.parseToInt(br.readLine()); - br.close(); + int ndistinct = -1; + try( BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(rcdpath))) ) { + ndistinct = UtilFunctions.parseToInt(br.readLine()); + } ret += (ndistinct-1); } else @@ -938,17 +941,17 @@ public class DataTransform if(oprnds.isApply) { - BufferedReader br = new BufferedReader(new InputStreamReader( fs.open(new Path(oprnds.applyTxPath + "/" + TfUtils.TXMTD_COLNAMES)) )); - ret = br.readLine(); - br.close(); + try( BufferedReader br = new BufferedReader(new InputStreamReader( fs.open(new Path(oprnds.applyTxPath + "/" + TfUtils.TXMTD_COLNAMES)) )) ) { + ret = br.readLine(); + } } else { if ( oprnds.outNamesFile == null ) ret = headerLine; else { - BufferedReader br = new BufferedReader(new InputStreamReader( fs.open(new Path(oprnds.outNamesFile)) )); - ret = br.readLine(); - br.close(); + try( BufferedReader br = new BufferedReader(new InputStreamReader( fs.open(new Path(oprnds.outNamesFile)) )) ) { + ret = br.readLine(); + } } } @@ -1077,13 +1080,13 @@ public class DataTransform { for(int fileNo=0; fileNo<files.size(); fileNo++) { - BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo)))); - if(fileNo==0 && prop.hasHeader() ) - br.readLine(); //ignore header - - while ( br.readLine() != null) - numRows++; - br.close(); + try(BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo))))) { + if(fileNo==0 && prop.hasHeader() ) + br.readLine(); //ignore header + + while ( br.readLine() != null) + numRows++; + } } numRowsTf = numRows; } @@ -1096,19 +1099,18 @@ public class DataTransform for(int fileNo=0; fileNo<files.size(); fileNo++) { - BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo)))); - if(fileNo==0 && prop.hasHeader() ) - br.readLine(); //ignore header - - while ( (line=br.readLine()) != null) - { - numRows++; - - words = delim.split(line, -1); - if(!oa.omit(words, agents)) - numRowsTf++; + try( BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo)))) ) { + if(fileNo==0 && prop.hasHeader() ) + br.readLine(); //ignore header + while ( (line=br.readLine()) != null) + { + numRows++; + + words = delim.split(line, -1); + if(!oa.omit(words, agents)) + numRowsTf++; + } } - br.close(); } } @@ -1162,20 +1164,19 @@ public class DataTransform String[] words = null; int numColumnsTf=0; - BufferedReader br = null; if (!isApply) { for(int fileNo=0; fileNo<files.size(); fileNo++) { - br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo)))); - if(fileNo==0 && prop.hasHeader() ) - br.readLine(); //ignore header - - line = null; - while ( (line = br.readLine()) != null) { - agents.prepareTfMtd(line); + try(BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo)))) ) { + if(fileNo==0 && prop.hasHeader() ) + br.readLine(); //ignore header + + line = null; + while ( (line = br.readLine()) != null) { + agents.prepareTfMtd(line); + } } - br.close(); } if(agents.getValid() == 0) @@ -1227,75 +1228,79 @@ public class DataTransform BufferedWriter out=new BufferedWriter(new OutputStreamWriter(fs.create(new Path(result.getFileName()),true))); StringBuilder sb = new StringBuilder(); - MatrixBlock mb = null; - if ( isBB ) - { - int estNNZ = (int)agents.getValid() * ncols; - mb = new MatrixBlock((int)agents.getValid(), numColumnsTf, estNNZ ); - - if ( mb.isInSparseFormat() ) - mb.allocateSparseRowsBlock(); - else - mb.allocateDenseBlock(); - } - - int rowID = 0; // rowid to be used in filling the matrix block - - for(int fileNo=0; fileNo<files.size(); fileNo++) - { - br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo)))); - if ( fileNo == 0 ) + try { + MatrixBlock mb = null; + if ( isBB ) { - if ( prop.hasHeader() ) - br.readLine(); // ignore the header line from data file + int estNNZ = (int)agents.getValid() * ncols; + mb = new MatrixBlock((int)agents.getValid(), numColumnsTf, estNNZ ); - //TODO: fix hard-wired header propagation to meta data column names - - String dcdHeader = _da.constructDummycodedHeader(headerLine, agents.getDelim()); - numColumnsTf = _da.genDcdMapsAndColTypes(fs, tfMtdPath, ncols, agents); - generateHeaderFiles(fs, tfMtdPath, headerLine, dcdHeader); + if ( mb.isInSparseFormat() ) + mb.allocateSparseRowsBlock(); + else + mb.allocateDenseBlock(); } - - line = null; - while ( (line = br.readLine()) != null) { - words = agents.getWords(line); - - if(!agents.omit(words)) - { - words = agents.apply(words); - if (isCSV) + int rowID = 0; // rowid to be used in filling the matrix block + + for(int fileNo=0; fileNo<files.size(); fileNo++) + { + try( BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo)))) ) { + if ( fileNo == 0 ) { - out.write( agents.checkAndPrepOutputString(words, sb) ); - out.write("\n"); + if ( prop.hasHeader() ) + br.readLine(); // ignore the header line from data file + + //TODO: fix hard-wired header propagation to meta data column names + + String dcdHeader = _da.constructDummycodedHeader(headerLine, agents.getDelim()); + numColumnsTf = _da.genDcdMapsAndColTypes(fs, tfMtdPath, ncols, agents); + generateHeaderFiles(fs, tfMtdPath, headerLine, dcdHeader); } - if( isBB ) - { - agents.check(words); - for(int c=0; c<words.length; c++) + line = null; + while ( (line = br.readLine()) != null) { + words = agents.getWords(line); + + if(!agents.omit(words)) { - if(words[c] == null || words[c].isEmpty()) - ; - else - mb.appendValue(rowID, c, UtilFunctions.parseToDouble(words[c])); + words = agents.apply(words); + + if (isCSV) + { + out.write( agents.checkAndPrepOutputString(words, sb) ); + out.write("\n"); + } + + if( isBB ) + { + agents.check(words); + for(int c=0; c<words.length; c++) + { + if(words[c] == null || words[c].isEmpty()) + ; + else + mb.appendValue(rowID, c, UtilFunctions.parseToDouble(words[c])); + } + } + rowID++; } } - rowID++; } } - br.close(); - } - out.close(); - - if(mb != null) - { - mb.recomputeNonZeros(); - mb.examSparsity(); - result.acquireModify(mb); - result.release(); - result.exportData(); + if(mb != null) + { + mb.recomputeNonZeros(); + mb.examSparsity(); + + result.acquireModify(mb); + result.release(); + result.exportData(); + } + } + finally { + IOUtilFunctions.closeSilently(out); } MatrixCharacteristics mc = new MatrixCharacteristics(agents.getValid(), numColumnsTf, (int) result.getNumRowsPerBlock(), (int) result.getNumColumnsPerBlock()); @@ -1306,16 +1311,16 @@ public class DataTransform public static void generateHeaderFiles(FileSystem fs, String txMtdDir, String origHeader, String newHeader) throws IOException { // write out given header line - Path pt=new Path(txMtdDir+"/" + TfUtils.TXMTD_COLNAMES); - BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); - br.write(origHeader+"\n"); - br.close(); + try( BufferedWriter br=new BufferedWriter(new OutputStreamWriter( + fs.create(new Path(txMtdDir+"/" + TfUtils.TXMTD_COLNAMES),true)))) { + br.write(origHeader+"\n"); + } // write out the new header line (after all transformations) - pt = new Path(txMtdDir + "/" + TfUtils.TXMTD_DC_COLNAMES); - br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); - br.write(newHeader+"\n"); - br.close(); + try( BufferedWriter br=new BufferedWriter(new OutputStreamWriter( + fs.create(new Path(txMtdDir + "/" + TfUtils.TXMTD_DC_COLNAMES),true))) ){ + br.write(newHeader+"\n"); + } } private static void checkIfOutputOverlapsWithTxMtd(MatrixObject[] outputs, TransformOperands oprnds,
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java index d3b11d3..676b31e 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/DummycodeAgent.java @@ -131,48 +131,46 @@ public class DummycodeAgent extends Encoder _dcdColumnMap = new int[numCols]; - Path pt=new Path(txMtdDir+"/Dummycode/" + TfUtils.DCD_FILE_NAME); - BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); - int sum=1; - int idx = 0; - for(int colID=1; colID <= numCols; colID++) - { - if ( _colList != null && idx < _colList.length && _colList[idx] == colID ) - { - br.write(colID + TfUtils.TXMTD_SEP + "1" + TfUtils.TXMTD_SEP + sum + TfUtils.TXMTD_SEP + (sum+_domainSizes[idx]-1) + "\n"); - _dcdColumnMap[colID-1] = (sum+_domainSizes[idx]-1)-1; - - for(int i=sum; i <=(sum+_domainSizes[idx]-1); i++) - ctypes[i-1] = TfUtils.ColumnTypes.DUMMYCODED; - - sum += _domainSizes[idx]; - idx++; - } - else + try( BufferedWriter br=new BufferedWriter(new OutputStreamWriter( + fs.create(new Path(txMtdDir+"/Dummycode/" + TfUtils.DCD_FILE_NAME),true))) ) { + int idx = 0; + for(int colID=1; colID <= numCols; colID++) { - br.write(colID + TfUtils.TXMTD_SEP + "0" + TfUtils.TXMTD_SEP + sum + TfUtils.TXMTD_SEP + sum + "\n"); - _dcdColumnMap[colID-1] = sum-1; - - if ( agents.getBinAgent().isApplicable(colID) != -1 ) - ctypes[sum-1] = TfUtils.ColumnTypes.ORDINAL; // binned variable results in an ordinal column - - if ( agents.getRecodeAgent().isApplicable(colID) != -1 ) - ctypes[sum-1] = TfUtils.ColumnTypes.NOMINAL; - - sum += 1; + if ( _colList != null && idx < _colList.length && _colList[idx] == colID ) + { + br.write(colID + TfUtils.TXMTD_SEP + "1" + TfUtils.TXMTD_SEP + sum + TfUtils.TXMTD_SEP + (sum+_domainSizes[idx]-1) + "\n"); + _dcdColumnMap[colID-1] = (sum+_domainSizes[idx]-1)-1; + + for(int i=sum; i <=(sum+_domainSizes[idx]-1); i++) + ctypes[i-1] = TfUtils.ColumnTypes.DUMMYCODED; + + sum += _domainSizes[idx]; + idx++; + } + else + { + br.write(colID + TfUtils.TXMTD_SEP + "0" + TfUtils.TXMTD_SEP + sum + TfUtils.TXMTD_SEP + sum + "\n"); + _dcdColumnMap[colID-1] = sum-1; + + if ( agents.getBinAgent().isApplicable(colID) != -1 ) + ctypes[sum-1] = TfUtils.ColumnTypes.ORDINAL; // binned variable results in an ordinal column + + if ( agents.getRecodeAgent().isApplicable(colID) != -1 ) + ctypes[sum-1] = TfUtils.ColumnTypes.NOMINAL; + + sum += 1; + } } } - br.close(); // Write coltypes.csv - pt=new Path(txMtdDir + File.separator + TfUtils.TXMTD_COLTYPES); - br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); - - br.write(ctypes[0].toID() + ""); - for(int i = 1; i < _dummycodedLength; i++) - br.write( TfUtils.TXMTD_SEP + ctypes[i].toID() ); - br.close(); + try(BufferedWriter br=new BufferedWriter(new OutputStreamWriter( + fs.create(new Path(txMtdDir + File.separator + TfUtils.TXMTD_COLTYPES),true))) ) { + br.write(ctypes[0].toID() + ""); + for(int i = 1; i < _dummycodedLength; i++) + br.write( TfUtils.TXMTD_SEP + ctypes[i].toID() ); + } return sum-1; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java index 2e3fd75..01fc784 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java +++ b/src/main/java/org/apache/sysml/runtime/transform/GTFMTDReducer.java @@ -35,7 +35,7 @@ import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.wink.json4j.JSONException; - +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; @@ -95,28 +95,31 @@ public class GTFMTDReducer implements Reducer<IntWritable, DistinctValue, Text, } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked","deprecation"}) private long generateOffsetsFile(ArrayList<OffsetCount> list) throws IllegalArgumentException, IOException { Collections.sort(list); - @SuppressWarnings("deprecation") - SequenceFile.Writer writer = new SequenceFile.Writer( + SequenceFile.Writer writer = null; + long lineOffset=0; + try { + writer = new SequenceFile.Writer( FileSystem.get(_rJob), _rJob, new Path(_agents.getOffsetFile()+"/part-00000"), ByteWritable.class, OffsetCount.class); - long lineOffset=0; - for(OffsetCount oc: list) - { - long count=oc.count; - oc.count=lineOffset; - writer.append(new ByteWritable((byte)0), oc); - lineOffset+=count; + for(OffsetCount oc: list) + { + long count=oc.count; + oc.count=lineOffset; + writer.append(new ByteWritable((byte)0), oc); + lineOffset+=count; + } + } + finally { + IOUtilFunctions.closeSilently(writer); } - writer.close(); list.clear(); - return lineOffset; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java index 1cb0f54..6e688ef 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java +++ b/src/main/java/org/apache/sysml/runtime/transform/GenTfMtdSPARK.java @@ -41,6 +41,7 @@ import org.apache.wink.json4j.JSONObject; import scala.Tuple2; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount; import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.Pair; @@ -173,7 +174,7 @@ public class GenTfMtdSPARK _agents = new TfUtils(headerLine, hasHeader, delim, nas, jspec, numCols, tfMtdDir, offsetFile, null); } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked","deprecation"}) @Override public Iterator<Long> call(Tuple2<Integer, Iterable<DistinctValue>> t) throws Exception { @@ -201,18 +202,20 @@ public class GenTfMtdSPARK list.add(new OffsetCount(iterDV.next().getOffsetCount())); Collections.sort(list); - @SuppressWarnings("deprecation") - SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, new Path(_agents.getOffsetFile()+"/part-00000"), ByteWritable.class, OffsetCount.class); - + SequenceFile.Writer writer = null; long lineOffset=0; - for(OffsetCount oc: list) - { - long count=oc.count; - oc.count=lineOffset; - writer.append(new ByteWritable((byte)0), oc); - lineOffset+=count; + try { + writer = new SequenceFile.Writer(fs, job, new Path(_agents.getOffsetFile()+"/part-00000"), ByteWritable.class, OffsetCount.class); + for(OffsetCount oc: list) { + long count=oc.count; + oc.count=lineOffset; + writer.append(new ByteWritable((byte)0), oc); + lineOffset+=count; + } + } + finally { + IOUtilFunctions.closeSilently(writer); } - writer.close(); list.clear(); numRows.add(lineOffset); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java index 1106099..97881e7 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/MVImputeAgent.java @@ -544,25 +544,25 @@ public class MVImputeAgent extends Encoder private void writeTfMtd(int colID, String mean, String tfMtdDir, FileSystem fs, TfUtils agents) throws IOException { Path pt=new Path(tfMtdDir+"/Impute/"+ agents.getName(colID) + TfUtils.TXMTD_MV_FILE_SUFFIX); - BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); - br.write(colID + TfUtils.TXMTD_SEP + mean + "\n"); - br.close(); + try( BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))) ) { + br.write(colID + TfUtils.TXMTD_SEP + mean + "\n"); + } } private void writeTfMtd(int colID, String mean, String sdev, String tfMtdDir, FileSystem fs, TfUtils agents) throws IOException { Path pt=new Path(tfMtdDir+"/Scale/"+ agents.getName(colID) + TfUtils.SCALE_FILE_SUFFIX); - BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); - br.write(colID + TfUtils.TXMTD_SEP + mean + TfUtils.TXMTD_SEP + sdev + "\n"); - br.close(); + try( BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))) ) { + br.write(colID + TfUtils.TXMTD_SEP + mean + TfUtils.TXMTD_SEP + sdev + "\n"); + } } private void writeTfMtd(int colID, String min, String max, String binwidth, String nbins, String tfMtdDir, FileSystem fs, TfUtils agents) throws IOException { Path pt = new Path(tfMtdDir+"/Bin/"+ agents.getName(colID) + TfUtils.TXMTD_BIN_FILE_SUFFIX); - BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); - br.write(colID + TfUtils.TXMTD_SEP + min + TfUtils.TXMTD_SEP + max + TfUtils.TXMTD_SEP + binwidth + TfUtils.TXMTD_SEP + nbins + "\n"); - br.close(); + try( BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))) ) { + br.write(colID + TfUtils.TXMTD_SEP + min + TfUtils.TXMTD_SEP + max + TfUtils.TXMTD_SEP + binwidth + TfUtils.TXMTD_SEP + nbins + "\n"); + } } public void outputTransformationMetadata(String outputDir, FileSystem fs, TfUtils agents) throws IOException { @@ -822,11 +822,11 @@ public class MVImputeAgent extends Encoder { Path path = new Path( txMtdDir + "/Impute/" + agents.getName(colID) + TfUtils.TXMTD_MV_FILE_SUFFIX); TfUtils.checkValidInputFile(fs, path, true); - - BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path))); - String line = br.readLine(); - String replacement = UtilFunctions.unquote(line.split(TfUtils.TXMTD_SEP)[1]); - br.close(); + String replacement = null; + try( BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path))) ) { + String line = br.readLine(); + replacement = UtilFunctions.unquote(line.split(TfUtils.TXMTD_SEP)[1]); + } return replacement; } @@ -835,9 +835,10 @@ public class MVImputeAgent extends Encoder { Path path = new Path( txMtdDir + "/Scale/" + agents.getName(colID) + TfUtils.SCALE_FILE_SUFFIX); TfUtils.checkValidInputFile(fs, path, true); - BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path))); - String line = br.readLine(); - br.close(); + String line = null; + try( BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path))) ) { + line = br.readLine(); + } return line; } @@ -892,7 +893,6 @@ public class MVImputeAgent extends Encoder processScalingFile(i, _scnomvList, _scnomvMeanList, _scnomvVarList, fs, tfMtdDir, agents); } else { - fs.close(); throw new RuntimeException("Path to recode maps must be a directory: " + tfMtdDir); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java index a9da1f2..57ff608 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java +++ b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java @@ -38,6 +38,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.sysml.lops.Lop; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.Pair; @@ -221,44 +222,46 @@ public class RecodeAgent extends Encoder Path pt=new Path(outputDir+"/Recode/"+ agents.getName(colID) + TfUtils.TXMTD_RCD_MAP_SUFFIX); BufferedWriter br=null; - if(isRecoded) - br = new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); - - // remove NA strings - if ( agents.getNAStrings() != null) - for(String naword : agents.getNAStrings()) - map.remove(naword); - - if(fromCP) - map = handleMVConstant(colID, agents, map); - - if ( map.size() == 0 ) - throw new RuntimeException("Can not proceed since \"" + agents.getName(colID) + "\" (id=" + colID + ") contains only the missing values, and not a single valid value -- set imputation method to \"constant\"."); - - // Order entries by category (string) value - List<String> newNames = new ArrayList<String>(map.keySet()); - Collections.sort(newNames); - - for(String w : newNames) { //map.keySet()) { - count = map.get(w); - ++rcdIndex; - - // output (w, count, rcdIndex) - if(br != null) - br.write(UtilFunctions.quote(w) + TfUtils.TXMTD_SEP + rcdIndex + TfUtils.TXMTD_SEP + count + "\n"); - - if(maxCount < count) { - maxCount = count; - mode = w; - modeIndex = rcdIndex; - } - - // Replace count with recode index (useful when invoked from CP) - map.put(w, (long)rcdIndex); + try { + if(isRecoded) + br = new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); + + // remove NA strings + if ( agents.getNAStrings() != null) + for(String naword : agents.getNAStrings()) + map.remove(naword); + + if(fromCP) + map = handleMVConstant(colID, agents, map); + + if ( map.size() == 0 ) + throw new RuntimeException("Can not proceed since \"" + agents.getName(colID) + "\" (id=" + colID + ") contains only the missing values, and not a single valid value -- set imputation method to \"constant\"."); + + // Order entries by category (string) value + List<String> newNames = new ArrayList<String>(map.keySet()); + Collections.sort(newNames); + + for(String w : newNames) { //map.keySet()) { + count = map.get(w); + ++rcdIndex; + + // output (w, count, rcdIndex) + if(br != null) + br.write(UtilFunctions.quote(w) + TfUtils.TXMTD_SEP + rcdIndex + TfUtils.TXMTD_SEP + count + "\n"); + + if(maxCount < count) { + maxCount = count; + mode = w; + modeIndex = rcdIndex; + } + + // Replace count with recode index (useful when invoked from CP) + map.put(w, (long)rcdIndex); + } + } + finally { + IOUtilFunctions.closeSilently(br); } - - if(br != null) - br.close(); if ( mode == null ) { mode = ""; @@ -269,23 +272,23 @@ public class RecodeAgent extends Encoder { // output mode pt=new Path(outputDir+"/Recode/"+ agents.getName(colID) + TfUtils.MODE_FILE_SUFFIX); - br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); - br.write(UtilFunctions.quote(mode) + "," + modeIndex + "," + maxCount ); - br.close(); + try(BufferedWriter br2=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))) ) { + br2.write(UtilFunctions.quote(mode) + "," + modeIndex + "," + maxCount ); + } // output number of distinct values pt=new Path(outputDir+"/Recode/"+ agents.getName(colID) + TfUtils.TXMTD_RCD_DISTINCT_SUFFIX); - br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); - br.write(""+map.size()); - br.close(); + try(BufferedWriter br2=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))) ) { + br2.write(""+map.size()); + } } if (isModeImputed) { pt=new Path(outputDir+"/Impute/"+ agents.getName(colID) + TfUtils.TXMTD_MV_FILE_SUFFIX); - br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); - br.write(colID + "," + UtilFunctions.quote(mode)); - br.close(); + try( BufferedWriter br2=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)))) { + br2.write(colID + "," + UtilFunctions.quote(mode)); + } } } @@ -347,20 +350,18 @@ public class RecodeAgent extends Encoder HashMap<String,String> map = new HashMap<String,String>(); Pair<String,String> pair = new Pair<String,String>(); - BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path))); String line = null; - - // Example line to parse: "WN (1)67492",1,61975 - while((line=br.readLine())!=null) { - DecoderRecode.parseRecodeMapEntry(line, pair); - map.put(pair.getKey(), pair.getValue()); + try( BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path))) ) { + // Example line to parse: "WN (1)67492",1,61975 + while((line=br.readLine())!=null) { + DecoderRecode.parseRecodeMapEntry(line, pair); + map.put(pair.getKey(), pair.getValue()); + } } - br.close(); _finalMaps.put(colID, map); } } else { - fs.close(); throw new RuntimeException("Path to recode maps must be a directory: " + txMtdDir); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java index cb35aec..58868e0 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java +++ b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java @@ -41,6 +41,7 @@ import org.apache.sysml.lops.Lop; import org.apache.sysml.parser.DataExpression; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.io.MatrixReader; import org.apache.sysml.runtime.matrix.CSVReblockMR; import org.apache.sysml.runtime.matrix.CSVReblockMR.OffsetCount; @@ -524,19 +525,22 @@ public class TfUtils implements Serializable{ */ public String getPartFileID(JobConf job, long offset) throws IOException { - Reader reader = initOffsetsReader(job); - - ByteWritable key=new ByteWritable(); - OffsetCount value=new OffsetCount(); - String thisFile = TfUtils.getPartFileName(job); - + Reader reader = null; int id = 0; - while (reader.next(key, value)) { - if ( thisFile.equals(value.filename) && value.fileOffset == offset ) - break; - id++; + try { + reader = initOffsetsReader(job); + ByteWritable key=new ByteWritable(); + OffsetCount value=new OffsetCount(); + String thisFile = TfUtils.getPartFileName(job); + while (reader.next(key, value)) { + if ( thisFile.equals(value.filename) && value.fileOffset == offset ) + break; + id++; + } + } + finally { + IOUtilFunctions.closeSilently(reader); } - reader.close(); String sid = Integer.toString(id); char[] carr = new char[5-sid.length()]; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java index 7a28de7..cd32a0c 100644 --- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java +++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java @@ -47,6 +47,7 @@ import org.apache.sysml.parser.DataExpression; import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.io.MatrixReader; import org.apache.sysml.runtime.io.MatrixReaderFactory; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; @@ -109,17 +110,36 @@ public class MapReduceTool } public static boolean existsFileOnHDFS(String fname){ - boolean ret = true; - try{ - Path outpath = new Path(fname); - ret = FileSystem.get(_rJob).exists(outpath); + try { + return FileSystem.get(_rJob) + .exists(new Path(fname)); } - catch(Exception ex) - { - LOG.error("Exception caught in existsFileOnHDFS", ex); - ret = false; + catch(Exception ex) { + LOG.error("Failed check existsFileOnHDFS.", ex); } - return ret; + return false; + } + + public static boolean isDirectory(String fname) { + try { + return FileSystem.get(_rJob) + .isDirectory(new Path(fname)); + } + catch(Exception ex) { + LOG.error("Failed check isDirectory.", ex); + } + return false; + } + + public static FileStatus[] getDirectoryListing(String fname) { + try { + return FileSystem.get(_rJob) + .listStatus(new Path(fname)); + } + catch(Exception ex) { + LOG.error("Failed listing of directory contents.", ex); + } + return new FileStatus[0]; } public static void deleteFileWithMTDIfExistOnHDFS(String fname) throws IOException { @@ -254,24 +274,25 @@ public class MapReduceTool public static String readStringFromHDFSFile(String filename) throws IOException { - BufferedReader br = setupInputFile(filename); - // handle multi-line strings in the HDFS file StringBuilder sb = new StringBuilder(); - String line = null; - while ( (line = br.readLine()) != null ) { - sb.append(line); - sb.append("\n"); + try( BufferedReader br = setupInputFile(filename) ) { + // handle multi-line strings in the HDFS file + String line = null; + while ( (line = br.readLine()) != null ) { + sb.append(line); + sb.append("\n"); + } } - br.close(); //return string without last character return sb.substring(0, sb.length()-1); } public static Object readObjectFromHDFSFile(String filename, ValueType vt) throws IOException { - BufferedReader br = setupInputFile(filename); - String line = br.readLine(); - br.close(); + String line = null; + try( BufferedReader br = setupInputFile(filename) ) { + line = br.readLine(); + } if( line == null ) throw new IOException("Empty file on hdfs: "+filename); @@ -307,24 +328,24 @@ public class MapReduceTool } public static void writeObjectToHDFS ( Object obj, String filename ) throws IOException { - BufferedWriter br = setupOutputFile(filename); - br.write(obj.toString()); - br.close(); + try( BufferedWriter br = setupOutputFile(filename) ) { + br.write(obj.toString()); + } } public static void writeDimsFile ( String filename, byte[] unknownFlags, long[] maxRows, long[] maxCols) throws IOException { - BufferedWriter br = setupOutputFile(filename); - StringBuilder line = new StringBuilder(); - for ( int i=0; i < unknownFlags.length; i++ ) { - if ( unknownFlags[i] != (byte)0 ) { - line.append(i); - line.append(" " + maxRows[i]); - line.append(" " + maxCols[i]); - line.append("\n"); - } + try( BufferedWriter br = setupOutputFile(filename) ) { + StringBuilder line = new StringBuilder(); + for ( int i=0; i < unknownFlags.length; i++ ) { + if ( unknownFlags[i] != (byte)0 ) { + line.append(i); + line.append(" " + maxRows[i]); + line.append(" " + maxCols[i]); + line.append("\n"); + } + } + br.write(line.toString()); } - br.write(line.toString()); - br.close(); } public static MatrixCharacteristics[] processDimsFiles(String dir, MatrixCharacteristics[] stats) @@ -343,21 +364,18 @@ public class MapReduceTool FileStatus[] files = fs.listStatus(pt); for ( int i=0; i < files.length; i++ ) { Path filePath = files[i].getPath(); - //System.out.println("Processing dims file: " + filePath.toString()); - BufferedReader br = setupInputFile(filePath.toString()); - - String line = ""; - while((line=br.readLine()) != null ) { - String[] parts = line.split(" "); - int resultIndex = Integer.parseInt(parts[0]); - long maxRows = Long.parseLong(parts[1]); - long maxCols = Long.parseLong(parts[2]); - - stats[resultIndex].setDimension( (stats[resultIndex].getRows() < maxRows ? maxRows : stats[resultIndex].getRows()), - (stats[resultIndex].getCols() < maxCols ? maxCols : stats[resultIndex].getCols()) ); + try( BufferedReader br = setupInputFile(filePath.toString()) ) { + String line = ""; + while((line=br.readLine()) != null ) { + String[] parts = line.split(" "); + int resultIndex = Integer.parseInt(parts[0]); + long maxRows = Long.parseLong(parts[1]); + long maxCols = Long.parseLong(parts[2]); + + stats[resultIndex].setDimension( (stats[resultIndex].getRows() < maxRows ? maxRows : stats[resultIndex].getRows()), + (stats[resultIndex].getCols() < maxCols ? maxCols : stats[resultIndex].getCols()) ); + } } - - br.close(); } } else @@ -389,12 +407,9 @@ public class MapReduceTool { Path pt = new Path(mtdfile); FileSystem fs = FileSystem.get(_rJob); - BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); - - try { + try( BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))) ) { String mtd = metaDataToString(vt, schema, dt, mc, outinfo, formatProperties); br.write(mtd); - br.close(); } catch (Exception e) { throw new IOException("Error creating and writing metadata JSON file", e); } @@ -405,12 +420,9 @@ public class MapReduceTool { Path pt = new Path(mtdfile); FileSystem fs = FileSystem.get(_rJob); - BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); - - try { + try( BufferedWriter br = new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))) ) { String mtd = metaDataToString(vt, null, DataType.SCALAR, null, OutputInfo.TextCellOutputInfo, null); br.write(mtd); - br.close(); } catch (Exception e) { throw new IOException("Error creating and writing metadata JSON file", e); @@ -558,37 +570,43 @@ public class MapReduceTool throw new RuntimeException("cannot read partition "+currentPart); int buffsz = 64 * 1024; - FSDataInputStream currentStream=fs.open(fileToRead, buffsz); - DoubleWritable readKey=new DoubleWritable(); + DoubleWritable readKey=new DoubleWritable(); IntWritable readValue=new IntWritable(); - - boolean contain0s=false; - long numZeros=0; - if(currentPart==metadata.getPartitionOfZero()) - { - contain0s=true; - numZeros=metadata.getNumberOfZero(); + FSDataInputStream currentStream = null; + double ret = -1; + try { + currentStream = fs.open(fileToRead, buffsz); + + boolean contain0s=false; + long numZeros=0; + if(currentPart==metadata.getPartitionOfZero()) + { + contain0s=true; + numZeros=metadata.getNumberOfZero(); + } + ReadWithZeros reader=new ReadWithZeros(currentStream, contain0s, numZeros); + + int numRead=0; + while(numRead<=offset) + { + reader.readNextKeyValuePairs(readKey, readValue); + numRead+=readValue.get(); + cum_weight += readValue.get(); + } + + ret = readKey.get(); + if(average) { + if(numRead<=offset+1) { + reader.readNextKeyValuePairs(readKey, readValue); + cum_weight += readValue.get(); + ret = (ret+readKey.get())/2; + } + } } - ReadWithZeros reader=new ReadWithZeros(currentStream, contain0s, numZeros); - - int numRead=0; - while(numRead<=offset) - { - reader.readNextKeyValuePairs(readKey, readValue); - numRead+=readValue.get(); - cum_weight += readValue.get(); + finally { + IOUtilFunctions.closeSilently(currentStream); } - - double ret = readKey.get(); - if(average) { - if(numRead<=offset+1) { - reader.readNextKeyValuePairs(readKey, readValue); - cum_weight += readValue.get(); - ret = (ret+readKey.get())/2; - } - } - currentStream.close(); - return new double[] {ret, (average ? -1 : readValue.get()), (average ? -1 : cum_weight)}; + return new double[] {ret, (average ? -1 : readValue.get()), (average ? -1 : cum_weight)}; } public static void createDirIfNotExistOnHDFS(String dir, String permissions) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/utils/InstallDependencyForIntegrationTests.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/utils/InstallDependencyForIntegrationTests.java b/src/main/java/org/apache/sysml/utils/InstallDependencyForIntegrationTests.java index 1eb8e6f..deacf11 100644 --- a/src/main/java/org/apache/sysml/utils/InstallDependencyForIntegrationTests.java +++ b/src/main/java/org/apache/sysml/utils/InstallDependencyForIntegrationTests.java @@ -161,12 +161,12 @@ public class InstallDependencyForIntegrationTests { try { System.out.println("Running script: " + dmlScriptFile + "\n"); System.out.println("******************* R script *******************"); - BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(dmlScriptFile))); - String content; - while ((content = in.readLine()) != null) { - System.out.println(content); + try( BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(dmlScriptFile)))) { + String content; + while ((content = in.readLine()) != null) { + System.out.println(content); + } } - in.close(); System.out.println("**************************************************\n\n"); } catch (IOException e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/utils/ParameterBuilder.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/utils/ParameterBuilder.java b/src/main/java/org/apache/sysml/utils/ParameterBuilder.java index ae01bac..97c7b58 100644 --- a/src/main/java/org/apache/sysml/utils/ParameterBuilder.java +++ b/src/main/java/org/apache/sysml/utils/ParameterBuilder.java @@ -32,6 +32,8 @@ import java.util.Map.Entry; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.sysml.runtime.io.IOUtilFunctions; + /** * Class to help setting variables in a script. */ @@ -58,33 +60,40 @@ public class ParameterBuilder String strScript = strScriptPathName; String strTmpScript = strScript + "t"; - BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(strScript))); - FileOutputStream out = new FileOutputStream(strTmpScript); - PrintWriter pw = new PrintWriter(out); - String content; - Pattern unresolvedVars = Pattern.compile(_RS + ".*" + _RS); - /* - * so that variables, which were not assigned, are replaced by an - * empty string - */ - while ((content = in.readLine()) != null) { - for (Entry<String, String> e : variables.entrySet() ) { - String variable = e.getKey(); - String val = e.getValue(); - Pattern pattern = Pattern.compile(_RS + variable + _RS); - Matcher matcher = pattern.matcher(content); - while (matcher.find()) { - content = content.replaceFirst(matcher.group().replace("$", "\\$"), val); + BufferedReader in = null; + PrintWriter pw = null; + + try { + in = new BufferedReader(new InputStreamReader(new FileInputStream(strScript))); + pw = new PrintWriter(new FileOutputStream(strTmpScript)); + + String content; + Pattern unresolvedVars = Pattern.compile(_RS + ".*" + _RS); + /* + * so that variables, which were not assigned, are replaced by an + * empty string + */ + while ((content = in.readLine()) != null) { + for (Entry<String, String> e : variables.entrySet() ) { + String variable = e.getKey(); + String val = e.getValue(); + Pattern pattern = Pattern.compile(_RS + variable + _RS); + Matcher matcher = pattern.matcher(content); + while (matcher.find()) { + content = content.replaceFirst(matcher.group().replace("$", "\\$"), val); + } } + Matcher matcher = unresolvedVars.matcher(content); + content = matcher.replaceAll(""); + pw.println(content); } - Matcher matcher = unresolvedVars.matcher(content); - content = matcher.replaceAll(""); - pw.println(content); } - pw.close(); - out.close(); - in.close(); - } catch (IOException e) { + finally { + IOUtilFunctions.closeSilently(pw); + IOUtilFunctions.closeSilently(in); + } + } + catch (IOException e) { fail("unable to set variables in dml script: " + e.getMessage()); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/yarn/DMLAppMaster.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/yarn/DMLAppMaster.java b/src/main/java/org/apache/sysml/yarn/DMLAppMaster.java index 778a278..8954951 100644 --- a/src/main/java/org/apache/sysml/yarn/DMLAppMaster.java +++ b/src/main/java/org/apache/sysml/yarn/DMLAppMaster.java @@ -137,9 +137,9 @@ public class DMLAppMaster //write given message to hdfs try { FileSystem fs = FileSystem.get(_conf); - FSDataOutputStream fout = fs.create(msgPath, true); - fout.writeBytes( msg ); - fout.close(); + try( FSDataOutputStream fout = fs.create(msgPath, true) ) { + fout.writeBytes( msg ); + } LOG.debug("Stop message written to HDFS file: "+msgPath ); } catch(Exception ex) { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/main/java/org/apache/sysml/yarn/DMLYarnClient.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/yarn/DMLYarnClient.java b/src/main/java/org/apache/sysml/yarn/DMLYarnClient.java index dd73262..bd9116d 100644 --- a/src/main/java/org/apache/sysml/yarn/DMLYarnClient.java +++ b/src/main/java/org/apache/sysml/yarn/DMLYarnClient.java @@ -29,7 +29,6 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -277,18 +276,17 @@ public class DMLYarnClient //(runtime plan migration during resource reoptimizations now needs to use qualified names //for shipping/reading intermediates) TODO modify resource reoptimizer on prototype integration. Path confPath = new Path(hdfsWD, DML_CONFIG_NAME); - FSDataOutputStream fout = fs.create(confPath, true); - //_dmlConfig.makeQualifiedScratchSpacePath(); - fout.writeBytes(_dmlConfig.serializeDMLConfig() + "\n"); - fout.close(); + try( FSDataOutputStream fout = fs.create(confPath, true) ) { + fout.writeBytes(_dmlConfig.serializeDMLConfig() + "\n"); + } _hdfsDMLConfig = confPath.makeQualified(fs).toString(); LOG.debug("DML config written to HDFS file: "+_hdfsDMLConfig+""); //serialize the dml script to HDFS file Path scriptPath = new Path(hdfsWD, DML_SCRIPT_NAME); - FSDataOutputStream fout2 = fs.create(scriptPath, true); - fout2.writeBytes(_dmlScript); - fout2.close(); + try( FSDataOutputStream fout2 = fs.create(scriptPath, true) ) { + fout2.writeBytes(_dmlScript); + } _hdfsDMLScript = scriptPath.makeQualified(fs).toString(); LOG.debug("DML script written to HDFS file: "+_hdfsDMLScript+""); @@ -524,10 +522,9 @@ public class DMLYarnClient FileSystem fs = FileSystem.get(yconf); if( fs.exists(msgPath) ) { - FSDataInputStream fin = fs.open(msgPath); - BufferedReader br = new BufferedReader(new InputStreamReader(fin)); - ret = br.readLine(); - fin.close(); + try( BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(msgPath))) ) { + ret = br.readLine(); + } LOG.debug("Stop message read from HDFS file "+msgPath+": "+ret ); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameCastingTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameCastingTest.java b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameCastingTest.java index d4f1c33..22e96f3 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameCastingTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameCastingTest.java @@ -30,6 +30,7 @@ import org.apache.sysml.api.jmlc.Connection; import org.apache.sysml.api.jmlc.PreparedScript; import org.apache.sysml.api.jmlc.ResultVariables; import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.test.integration.AutomatedTestBase; import org.apache.sysml.test.integration.TestConfiguration; import org.apache.sysml.test.utils.TestUtils; @@ -158,10 +159,8 @@ public class FrameCastingTest extends AutomatedTestBase ex.printStackTrace(); throw new IOException(ex); } - finally - { - if( conn != null ) - conn.close(); + finally { + IOUtilFunctions.closeSilently(conn); } System.out.println("JMLC scoring w/ "+nRuns+" runs in "+time.stop()+"ms."); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameDecodeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameDecodeTest.java b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameDecodeTest.java index eab1f5d..c9c973e 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameDecodeTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameDecodeTest.java @@ -30,6 +30,7 @@ import org.apache.sysml.api.jmlc.Connection; import org.apache.sysml.api.jmlc.PreparedScript; import org.apache.sysml.api.jmlc.ResultVariables; import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.test.integration.AutomatedTestBase; import org.apache.sysml.test.integration.TestConfiguration; import org.apache.sysml.test.utils.TestUtils; @@ -159,10 +160,8 @@ public class FrameDecodeTest extends AutomatedTestBase ex.printStackTrace(); throw new IOException(ex); } - finally - { - if( conn != null ) - conn.close(); + finally { + IOUtilFunctions.closeSilently(conn); } System.out.println("JMLC scoring w/ "+nRuns+" runs in "+time.stop()+"ms."); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameEncodeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameEncodeTest.java b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameEncodeTest.java index aed90f3..a615771 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameEncodeTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameEncodeTest.java @@ -30,6 +30,7 @@ import org.apache.sysml.api.jmlc.Connection; import org.apache.sysml.api.jmlc.PreparedScript; import org.apache.sysml.api.jmlc.ResultVariables; import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.test.integration.AutomatedTestBase; import org.apache.sysml.test.integration.TestConfiguration; import org.apache.sysml.test.utils.TestUtils; @@ -157,10 +158,8 @@ public class FrameEncodeTest extends AutomatedTestBase ex.printStackTrace(); throw new IOException(ex); } - finally - { - if( conn != null ) - conn.close(); + finally { + IOUtilFunctions.closeSilently(conn); } System.out.println("JMLC scoring w/ "+nRuns+" runs in "+time.stop()+"ms."); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameIndexingAppendTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameIndexingAppendTest.java b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameIndexingAppendTest.java index 3b778c5..f8a9ef1 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameIndexingAppendTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameIndexingAppendTest.java @@ -30,6 +30,7 @@ import org.apache.sysml.api.jmlc.Connection; import org.apache.sysml.api.jmlc.PreparedScript; import org.apache.sysml.api.jmlc.ResultVariables; import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.test.integration.AutomatedTestBase; import org.apache.sysml.test.integration.TestConfiguration; import org.apache.sysml.test.utils.TestUtils; @@ -160,10 +161,8 @@ public class FrameIndexingAppendTest extends AutomatedTestBase ex.printStackTrace(); throw new IOException(ex); } - finally - { - if( conn != null ) - conn.close(); + finally { + IOUtilFunctions.closeSilently(conn); } System.out.println("JMLC scoring w/ "+nRuns+" runs in "+time.stop()+"ms."); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameLeftIndexingTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameLeftIndexingTest.java b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameLeftIndexingTest.java index b1c91df..1b55a0a 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameLeftIndexingTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameLeftIndexingTest.java @@ -30,6 +30,7 @@ import org.apache.sysml.api.jmlc.Connection; import org.apache.sysml.api.jmlc.PreparedScript; import org.apache.sysml.api.jmlc.ResultVariables; import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.test.integration.AutomatedTestBase; import org.apache.sysml.test.integration.TestConfiguration; import org.apache.sysml.test.utils.TestUtils; @@ -160,10 +161,8 @@ public class FrameLeftIndexingTest extends AutomatedTestBase ex.printStackTrace(); throw new IOException(ex); } - finally - { - if( conn != null ) - conn.close(); + finally { + IOUtilFunctions.closeSilently(conn); } System.out.println("JMLC scoring w/ "+nRuns+" runs in "+time.stop()+"ms."); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameReadMetaTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameReadMetaTest.java b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameReadMetaTest.java index e1b3143..0de152c 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameReadMetaTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameReadMetaTest.java @@ -162,10 +162,8 @@ public class FrameReadMetaTest extends AutomatedTestBase ex.printStackTrace(); throw new IOException(ex); } - finally - { - if( conn != null ) - conn.close(); + finally { + IOUtilFunctions.closeSilently(conn); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameTransformTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameTransformTest.java b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameTransformTest.java index 7e78c9a..d1662e4 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameTransformTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/FrameTransformTest.java @@ -32,6 +32,7 @@ import org.apache.sysml.api.jmlc.PreparedScript; import org.apache.sysml.api.jmlc.ResultVariables; import org.apache.sysml.lops.Lop; import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.test.integration.AutomatedTestBase; import org.apache.sysml.test.integration.TestConfiguration; import org.apache.sysml.test.utils.TestUtils; @@ -160,10 +161,8 @@ public class FrameTransformTest extends AutomatedTestBase ex.printStackTrace(); throw new IOException(ex); } - finally - { - if( conn != null ) - conn.close(); + finally { + IOUtilFunctions.closeSilently(conn); } System.out.println("JMLC scoring w/ "+nRuns+" runs in "+time.stop()+"ms."); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/test/java/org/apache/sysml/test/integration/functions/jmlc/ReuseModelVariablesTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/ReuseModelVariablesTest.java b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/ReuseModelVariablesTest.java index 2cc4449..6892767 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/ReuseModelVariablesTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/ReuseModelVariablesTest.java @@ -29,6 +29,7 @@ import org.apache.sysml.api.jmlc.Connection; import org.apache.sysml.api.jmlc.PreparedScript; import org.apache.sysml.api.jmlc.ResultVariables; import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.test.integration.AutomatedTestBase; import org.apache.sysml.test.integration.TestConfiguration; @@ -178,10 +179,8 @@ public class ReuseModelVariablesTest extends AutomatedTestBase ex.printStackTrace(); throw new IOException(ex); } - finally - { - if( conn != null ) - conn.close(); + finally { + IOUtilFunctions.closeSilently(conn); } System.out.println("JMLC scoring w/ "+nRuns+" runs in "+time.stop()+"ms."); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/test/java/org/apache/sysml/test/integration/functions/misc/DataTypeChangeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/misc/DataTypeChangeTest.java b/src/test/java/org/apache/sysml/test/integration/functions/misc/DataTypeChangeTest.java index 8d30240..42e6c05 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/misc/DataTypeChangeTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/misc/DataTypeChangeTest.java @@ -190,11 +190,11 @@ public class DataTypeChangeTest extends AutomatedTestBase HashMap<String, String> argVals = new HashMap<String,String>(); //read script - BufferedReader in = new BufferedReader(new FileReader(fullTestName)); - String s1 = null; - while ((s1 = in.readLine()) != null) - dmlScriptString += s1 + "\n"; - in.close(); + try( BufferedReader in = new BufferedReader(new FileReader(fullTestName)) ) { + String s1 = null; + while ((s1 = in.readLine()) != null) + dmlScriptString += s1 + "\n"; + } //parsing and dependency analysis AParserWrapper parser = AParserWrapper.createParser(false); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDependencyAnalysisTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDependencyAnalysisTest.java b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDependencyAnalysisTest.java index 9f14bdc..f58d747 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDependencyAnalysisTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForDependencyAnalysisTest.java @@ -344,11 +344,11 @@ public class ParForDependencyAnalysisTest extends AutomatedTestBase HashMap<String, String> argVals = new HashMap<String,String>(); //read script - BufferedReader in = new BufferedReader(new FileReader(HOME + scriptFilename)); - String s1 = null; - while ((s1 = in.readLine()) != null) - dmlScriptString += s1 + "\n"; - in.close(); + try( BufferedReader in = new BufferedReader(new FileReader(HOME + scriptFilename)) ) { + String s1 = null; + while ((s1 = in.readLine()) != null) + dmlScriptString += s1 + "\n"; + } //parsing and dependency analysis AParserWrapper parser = AParserWrapper.createParser(false); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/795a94e7/src/test/java/org/apache/sysml/test/integration/functions/transform/ScalingTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/transform/ScalingTest.java b/src/test/java/org/apache/sysml/test/integration/functions/transform/ScalingTest.java index d0d351c..b66813d 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/transform/ScalingTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/transform/ScalingTest.java @@ -143,9 +143,9 @@ public class ScalingTest extends AutomatedTestBase outputSpec.put(TfUtils.TXMETHOD_SCALE, scaleSpec); FileSystem fs = FileSystem.get(TestUtils.conf); - BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(specFile),true))); - out.write(outputSpec.toString()); - out.close(); + try( BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(specFile),true))) ) { + out.write(outputSpec.toString()); + } } @@ -158,9 +158,9 @@ public class ScalingTest extends AutomatedTestBase mtd.put("header", false); FileSystem fs = FileSystem.get(TestUtils.conf); - BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(datafile+".mtd"),true))); - out.write(mtd.toString()); - out.close(); + try( BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(datafile+".mtd"),true))) ) { + out.write(mtd.toString()); + } } /**