Repository: incubator-systemml Updated Branches: refs/heads/master ce9392741 -> 7f8716b64
Fix mr iqm pick range record reader (current part weights), incl cleanup Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/2fa69cd6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/2fa69cd6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/2fa69cd6 Branch: refs/heads/master Commit: 2fa69cd668e76e90baabfb2dcfdf7c549f6b3a2f Parents: ce93927 Author: Matthias Boehm <[email protected]> Authored: Sat Feb 13 17:46:58 2016 -0800 Committer: Matthias Boehm <[email protected]> Committed: Sat Feb 13 17:46:58 2016 -0800 ---------------------------------------------------------------------- .../matrix/sort/PickFromCompactInputFormat.java | 191 +++++++------------ 1 file changed, 72 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2fa69cd6/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java index 7e8aef0..d03afc8 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java @@ -48,8 +48,7 @@ import org.apache.sysml.runtime.matrix.data.Pair; //key class to read has to be DoubleWritable public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, MatrixCell> -{ - +{ public static final String VALUE_IS_WEIGHT="value.is.weight"; public static final String INPUT_IS_VECTOR="input.is.vector"; public static final String SELECTED_RANGES="selected.ranges"; @@ -191,11 +190,9 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M wt += counts[partID]; } sb.append(partID+"," + (wt-counts[partID]) + ";"); - sb.append(sumwt + "," + lbound + "," + ubound); - //System.out.println("range string: " + sb.toString()); + job.set(SELECTED_RANGES, sb.toString()); - job.setBoolean(INPUT_IS_VECTOR, false); } @@ -222,111 +219,89 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M public static class RangePickRecordReader implements RecordReader<MatrixIndexes, MatrixCell> { - //private boolean valueIsWeight=true; - protected long totLength; - protected FileSystem fs; - protected Path path; - protected JobConf conf; + private Path path; - protected FSDataInputStream currentStream; - + private FSDataInputStream currentStream; private int startPos=0; private int numToRead=0; - DoubleWritable readKey=new DoubleWritable(); - Writable readValue = new IntWritable(0); + private DoubleWritable readKey=new DoubleWritable(); + private Writable readValue = new IntWritable(0); private boolean noRecordsNeeded=false; private int rawKeyValuesRead=0; private int index=0; - //private int currentRepeat=0; + private int beginPart=-1, endPart=-1, currPart=-1; + private double sumwt = 0.0, readWt; // total weight (set in JobConf) + private double lbound, ubound; + private HashMap<Integer,Double> partWeights = null; + private boolean isFirstRecord = true; + private ReadWithZeros reader=null; - int beginPart=-1, endPart=-1, currPart=-1; - double sumwt = 0.0, readWt, wtUntilCurrPart; // total weight (set in JobConf) - double lbound, ubound; - double[] partWeights = null; - boolean isFirstRecord = true; - - - //to handle zeros - ReadWithZeros reader=null; - /*private boolean contain0s=false; - private boolean justFound0=false; - private DoubleWritable keyAfterZero; - private Writable valueAfterZero; - private long numZeros=0;*/ - - private int getIndexInTheArray(String name) - { - int i=name.indexOf("part-"); - assert(i>=0); - return Integer.parseInt(name.substring(i+5)); - } - - private void parseSelectedRangeString(String str) { - String[] f1 = str.split(";"); - String[] f2 = null; - - // Each field of the form: "pid,wt" where wt is the total wt until the part pid - partWeights = new double[f1.length-1]; - for(int i=0; i < f1.length-1; i++) { - f2 = f1[i].split(","); - if(i==0) { - beginPart = Integer.parseInt(f2[0]); - } - if (i==f1.length-2) { - endPart = Integer.parseInt(f2[0]); - } - partWeights[i] = Double.parseDouble(f2[1]); - } - - // last field: "sumwt, lbound, ubound" - f2 = f1[f1.length-1].split(","); - sumwt = Double.parseDouble(f2[0]); - lbound = Double.parseDouble(f2[1]); - ubound = Double.parseDouble(f2[2]); - } - @SuppressWarnings("unchecked") - public RangePickRecordReader(JobConf job, FileSplit split) throws IOException { + public RangePickRecordReader(JobConf job, FileSplit split) + throws IOException + { parseSelectedRangeString(job.get(SELECTED_RANGES)); // check if the current part file needs to be processed path = split.getPath(); - totLength = split.getLength(); currentStream = FileSystem.get(job).open(path); currPart = getIndexInTheArray(path.getName()); - //System.out.println("RangePickRecordReader(): sumwt " + sumwt + " currPart " + currPart + " partRange [" + beginPart + "," + endPart + "]"); if ( currPart < beginPart || currPart > endPart ) { - System.out.println(" currPart is out of range. Skipping part!"); noRecordsNeeded = true; return; } Class<? extends Writable> valueClass=(Class<? extends Writable>) job.getClass(VALUE_CLASS, Writable.class); try { - // readKey=keyClass.newInstance(); readValue=valueClass.newInstance(); } catch (Exception e) { throw new RuntimeException(e); } - //valueIsWeight=job.getBoolean(VALUE_IS_WEIGHT, true); int part0=job.getInt(PARTITION_OF_ZERO, -1); boolean contain0s=false; long numZeros =0; - if(part0==currPart) - { + if(part0==currPart) { contain0s = true; numZeros = job.getLong(NUMBER_OF_ZERO, 0); } reader=new ReadWithZeros(currentStream, contain0s, numZeros); } - public boolean next(MatrixIndexes key, MatrixCell value) throws IOException { - assert(currPart!=-1); + private int getIndexInTheArray(String name) { + int i=name.indexOf("part-"); + return Integer.parseInt(name.substring(i+5)); + } + + private void parseSelectedRangeString(String str) { + String[] f1 = str.split(";"); + String[] f2 = null; + // Each field of the form: "pid,wt" where wt is the total wt until the part pid + partWeights = new HashMap<Integer,Double>(); + for(int i=0; i < f1.length-1; i++) { + f2 = f1[i].split(","); + if(i==0) { + beginPart = Integer.parseInt(f2[0]); + } + if (i==f1.length-2) { + endPart = Integer.parseInt(f2[0]); + } + partWeights.put(i, Double.parseDouble(f2[1])); + } + + // last field: "sumwt, lbound, ubound" + f2 = f1[f1.length-1].split(","); + sumwt = Double.parseDouble(f2[0]); + lbound = Double.parseDouble(f2[1]); + ubound = Double.parseDouble(f2[2]); + } + + public boolean next(MatrixIndexes key, MatrixCell value) + throws IOException + { if(noRecordsNeeded) - // this part file does not fall within the required range of values return false; double qLowerD = sumwt*lbound; // lower quantile in double @@ -334,15 +309,14 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M // weight until current part if (isFirstRecord) { - readWt = partWeights[currPart]; - wtUntilCurrPart = partWeights[currPart]; + if( !partWeights.containsKey(currPart) ) + return false; //noRecordsNeeded + readWt = partWeights.get(currPart); isFirstRecord = false; } double tmpWt = 0; if ( currPart == beginPart || currPart == endPart ) { - //readWt = partWeights[currPart]; - reader.readNextKeyValuePairs(readKey, (IntWritable)readValue); tmpWt = ((IntWritable)readValue).get(); @@ -356,14 +330,13 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M key.setIndexes(++index,1); value.setValue(readKey.get()*tmpWt); readWt += tmpWt; - //System.out.println("currpart " + currPart + ", return (" + index +",1): " + readKey.get()*tmpWt + "| [" + readKey.get() + "," + tmpWt +"] readWt=" + readWt); return true; } else { return false; } } - else { //if(currPart != beginPart && currPart != endPart) { + else { // read full part reader.readNextKeyValuePairs(readKey, (IntWritable)readValue); tmpWt = ((IntWritable)readValue).get(); @@ -372,7 +345,6 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M value.setValue(readKey.get()*tmpWt); readWt += tmpWt; - //System.out.println("currpart " + currPart + ", return (" + index +",1): " + readKey.get()*tmpWt + "| [" + readKey.get() + "," + tmpWt +"] readWt=" + readWt); return true; } } @@ -408,68 +380,58 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M } } + /** + * + */ public static class PickRecordReader implements RecordReader<MatrixIndexes, MatrixCell> { private boolean valueIsWeight=true; - protected long totLength; - protected FileSystem fs; - protected Path path; - protected JobConf conf; - - protected FSDataInputStream currentStream; + private FileSystem fs; + private Path path; + + private FSDataInputStream currentStream; private int posIndex=0; private int[] pos=null; //starting from 0 private int[] indexes=null; - DoubleWritable readKey=new DoubleWritable(); - Writable readValue; + private DoubleWritable readKey=new DoubleWritable(); + private Writable readValue; private int numRead=0; private boolean noRecordsNeeded=false; - - //to handle zeros ReadWithZeros reader=null; - /*private boolean contain0s=false; - private boolean justFound0=false; - private DoubleWritable keyAfterZero; - private Writable valueAfterZero; - private long numZeros=0;*/ private int getIndexInTheArray(String name) { int i=name.indexOf("part-"); - assert(i>=0); return Integer.parseInt(name.substring(i+5)); } @SuppressWarnings("unchecked") public PickRecordReader(JobConf job, FileSplit split) - throws IOException{ + throws IOException + { fs = FileSystem.get(job); path = split.getPath(); - totLength = split.getLength(); currentStream = fs.open(path); int partIndex=getIndexInTheArray(path.getName()); String arrStr=job.get(SELECTED_POINTS_PREFIX+partIndex); - //System.out.println("read back in the recordreader: "+arrStr); - if(arrStr==null || arrStr.isEmpty()) - { + if(arrStr==null || arrStr.isEmpty()) { noRecordsNeeded=true; return; } + String[] strs=arrStr.split(","); pos=new int[strs.length]; indexes=new int[strs.length]; - for(int i=0; i<strs.length; i++) - { + for(int i=0; i<strs.length; i++) { String[] temp=strs[i].split(":"); pos[i]=Integer.parseInt(temp[0]); indexes[i]=Integer.parseInt(temp[1]); } - // Class<? extends WritableComparable> keyClass=(Class<? extends WritableComparable>) job.getClass(KEY_CLASS, WritableComparable.class); + Class<? extends Writable> valueClass=(Class<? extends Writable>) job.getClass(VALUE_CLASS, Writable.class); try { - // readKey=keyClass.newInstance(); - readValue=valueClass.newInstance(); + readValue=valueClass.newInstance(); } catch (Exception e) { throw new RuntimeException(e); } @@ -478,8 +440,7 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M int part0=job.getInt(PARTITION_OF_ZERO, -1); boolean contain0s=false; long numZeros =0; - if(part0==partIndex) - { + if(part0==partIndex) { contain0s = true; numZeros = job.getLong(NUMBER_OF_ZERO, 0); } @@ -487,21 +448,15 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M } public boolean next(MatrixIndexes key, MatrixCell value) - throws IOException { - - if(noRecordsNeeded || posIndex>=pos.length) - { - //System.out.println("return false"); - //System.out.println("noRecordsNeeded="+noRecordsNeeded+", currentStream.getPos()="+currentStream.getPos() - // +", totLength="+totLength+", posIndex="+posIndex+", pos.length="+pos.length); + throws IOException + { + if(noRecordsNeeded || posIndex>=pos.length) { return false; } - //System.out.println("numRead="+numRead+" pos["+posIndex+"]="+pos[posIndex]); while(numRead<=pos[posIndex]) { reader.readNextKeyValuePairs(readKey, (IntWritable)readValue); - // System.out.println("**** numRead "+numRead+" -- "+readKey+": "+readValue); if(valueIsWeight) numRead+=((IntWritable)readValue).get(); else @@ -511,14 +466,12 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M key.setIndexes(indexes[posIndex], 1); value.setValue(readKey.get()); posIndex++; - //System.out.println("next: "+key+", "+value); return true; } @Override public void close() throws IOException { - currentStream.close(); - + currentStream.close(); } @Override
