Fix mr iqm/quantile record readers (bounded partition read, progress) Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/240143bb Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/240143bb Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/240143bb
Branch: refs/heads/master Commit: 240143bbad57489872b86114cee4ac206fa9c2a4 Parents: 2fa69cd Author: Matthias Boehm <[email protected]> Authored: Sat Feb 13 22:16:30 2016 -0800 Committer: Matthias Boehm <[email protected]> Committed: Sat Feb 13 22:16:30 2016 -0800 ---------------------------------------------------------------------- .../matrix/sort/PickFromCompactInputFormat.java | 53 +++++++------------- .../runtime/matrix/sort/ReadWithZeros.java | 13 +++-- 2 files changed, 25 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/240143bb/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java index d03afc8..7dd3c13 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java @@ -222,12 +222,10 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M private Path path; private FSDataInputStream currentStream; - private int startPos=0; - private int numToRead=0; + protected long totLength; private DoubleWritable readKey=new DoubleWritable(); - private Writable readValue = new IntWritable(0); + private IntWritable readValue = new IntWritable(0); private boolean noRecordsNeeded=false; - private int rawKeyValuesRead=0; private int index=0; private int beginPart=-1, endPart=-1, currPart=-1; private double sumwt = 0.0, readWt; // total weight (set in JobConf) @@ -236,7 +234,6 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M private boolean isFirstRecord = true; private ReadWithZeros reader=null; - @SuppressWarnings("unchecked") public RangePickRecordReader(JobConf job, FileSplit split) throws IOException { @@ -244,6 +241,7 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M // check if the current part file needs to be processed path = split.getPath(); + totLength = split.getLength(); currentStream = FileSystem.get(job).open(path); currPart = getIndexInTheArray(path.getName()); @@ -252,13 +250,6 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M return; } - Class<? extends Writable> valueClass=(Class<? extends Writable>) job.getClass(VALUE_CLASS, Writable.class); - try { - readValue=valueClass.newInstance(); - } catch (Exception e) { - throw new RuntimeException(e); - } - int part0=job.getInt(PARTITION_OF_ZERO, -1); boolean contain0s=false; long numZeros =0; @@ -317,20 +308,20 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M double tmpWt = 0; if ( currPart == beginPart || currPart == endPart ) { - reader.readNextKeyValuePairs(readKey, (IntWritable)readValue); - tmpWt = ((IntWritable)readValue).get(); + boolean ret = reader.readNextKeyValuePairs(readKey, readValue); + tmpWt = readValue.get(); while(readWt+tmpWt < qLowerD) { readWt += tmpWt; - reader.readNextKeyValuePairs(readKey, (IntWritable)readValue); - tmpWt = ((IntWritable)readValue).get(); + ret &= reader.readNextKeyValuePairs(readKey, readValue); + tmpWt = readValue.get(); } if((readWt<qLowerD && readWt+tmpWt >= qLowerD) || (readWt+tmpWt <= qUpperD) || (readWt<qUpperD && readWt+tmpWt>=qUpperD)) { key.setIndexes(++index,1); value.setValue(readKey.get()*tmpWt); readWt += tmpWt; - return true; + return ret; } else { return false; @@ -338,20 +329,19 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M } else { // read full part - reader.readNextKeyValuePairs(readKey, (IntWritable)readValue); - tmpWt = ((IntWritable)readValue).get(); + boolean ret = reader.readNextKeyValuePairs(readKey, readValue); + tmpWt = readValue.get(); key.setIndexes(++index,1); value.setValue(readKey.get()*tmpWt); readWt += tmpWt; - return true; + return ret; } } @Override public void close() throws IOException { - //DO Nothing currentStream.close(); } @@ -373,10 +363,8 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M @Override public float getProgress() throws IOException { - if(numToRead>0) - return (float)(rawKeyValuesRead-startPos)/(float)numToRead; - else - return 100.0f; + float progress = (float) getPos() / totLength; + return (progress>=0 && progress<=1) ? progress : 1.0f; } } @@ -394,8 +382,8 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M private int[] pos=null; //starting from 0 private int[] indexes=null; - private DoubleWritable readKey=new DoubleWritable(); - private Writable readValue; + private DoubleWritable readKey = new DoubleWritable(); + private IntWritable readValue = new IntWritable(); private int numRead=0; private boolean noRecordsNeeded=false; ReadWithZeros reader=null; @@ -406,7 +394,6 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M return Integer.parseInt(name.substring(i+5)); } - @SuppressWarnings("unchecked") public PickRecordReader(JobConf job, FileSplit split) throws IOException { @@ -429,12 +416,6 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M indexes[i]=Integer.parseInt(temp[1]); } - Class<? extends Writable> valueClass=(Class<? extends Writable>) job.getClass(VALUE_CLASS, Writable.class); - try { - readValue=valueClass.newInstance(); - } catch (Exception e) { - throw new RuntimeException(e); - } valueIsWeight=job.getBoolean(VALUE_IS_WEIGHT, true); int part0=job.getInt(PARTITION_OF_ZERO, -1); @@ -456,9 +437,9 @@ public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, M while(numRead<=pos[posIndex]) { - reader.readNextKeyValuePairs(readKey, (IntWritable)readValue); + reader.readNextKeyValuePairs(readKey, readValue); if(valueIsWeight) - numRead+=((IntWritable)readValue).get(); + numRead+=readValue.get(); else numRead++; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/240143bb/src/main/java/org/apache/sysml/runtime/matrix/sort/ReadWithZeros.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/sort/ReadWithZeros.java b/src/main/java/org/apache/sysml/runtime/matrix/sort/ReadWithZeros.java index dc578a8..8793be9 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/sort/ReadWithZeros.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/ReadWithZeros.java @@ -28,9 +28,7 @@ import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; public class ReadWithZeros -{ - - +{ private boolean contain0s=false; private long numZeros=0; private FSDataInputStream currentStream; @@ -46,8 +44,11 @@ public class ReadWithZeros numZeros=num0; } - public void readNextKeyValuePairs(DoubleWritable readKey, IntWritable readValue)throws IOException + public boolean readNextKeyValuePairs(DoubleWritable readKey, IntWritable readValue) + throws IOException { + boolean ret = true; + try { if(contain0s && justFound0) { @@ -68,7 +69,7 @@ public class ReadWithZeros readValue.set((int)numZeros); } else { - throw e; + ret = false; } } @@ -80,5 +81,7 @@ public class ReadWithZeros readKey.set(0); readValue.set((int)numZeros); } + + return ret; } }
