Fix mr iqm/quantile record readers (bounded partition read, progress) 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/240143bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/240143bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/240143bb

Branch: refs/heads/master
Commit: 240143bbad57489872b86114cee4ac206fa9c2a4
Parents: 2fa69cd
Author: Matthias Boehm <[email protected]>
Authored: Sat Feb 13 22:16:30 2016 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Sat Feb 13 22:16:30 2016 -0800

----------------------------------------------------------------------
 .../matrix/sort/PickFromCompactInputFormat.java | 53 +++++++-------------
 .../runtime/matrix/sort/ReadWithZeros.java      | 13 +++--
 2 files changed, 25 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/240143bb/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
 
b/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
index d03afc8..7dd3c13 100644
--- 
a/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
@@ -222,12 +222,10 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
                private Path path;
            
                private FSDataInputStream currentStream;
-               private int startPos=0;
-               private int numToRead=0;
+               protected long totLength;
                private DoubleWritable readKey=new DoubleWritable();
-               private Writable readValue = new IntWritable(0);
+               private IntWritable readValue = new IntWritable(0);
                private boolean noRecordsNeeded=false;
-               private int rawKeyValuesRead=0;
                private int index=0;
                private int beginPart=-1, endPart=-1, currPart=-1;
                private double sumwt = 0.0, readWt;  // total weight (set in 
JobConf)
@@ -236,7 +234,6 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
                private boolean isFirstRecord = true;
                private ReadWithZeros reader=null;
                
-               @SuppressWarnings("unchecked")
                public RangePickRecordReader(JobConf job, FileSplit split) 
                        throws IOException 
                {
@@ -244,6 +241,7 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
                        
                        // check if the current part file needs to be processed
                path = split.getPath();
+               totLength = split.getLength();
                currentStream = FileSystem.get(job).open(path);
                currPart = getIndexInTheArray(path.getName());
                
@@ -252,13 +250,6 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
                        return;
                }
                
-               Class<? extends Writable> valueClass=(Class<? extends 
Writable>) job.getClass(VALUE_CLASS, Writable.class);
-               try {
-                               readValue=valueClass.newInstance();
-                       } catch (Exception e) {
-                               throw new RuntimeException(e);
-                       }
-                       
                        int part0=job.getInt(PARTITION_OF_ZERO, -1);
                        boolean contain0s=false;
                        long numZeros =0;
@@ -317,20 +308,20 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
                        double tmpWt = 0;
                        
                        if ( currPart == beginPart || currPart == endPart ) {
-                               reader.readNextKeyValuePairs(readKey, 
(IntWritable)readValue);
-                               tmpWt = ((IntWritable)readValue).get();
+                               boolean ret = 
reader.readNextKeyValuePairs(readKey, readValue);
+                               tmpWt = readValue.get();
 
                                while(readWt+tmpWt < qLowerD) {
                                        readWt += tmpWt;
-                                       reader.readNextKeyValuePairs(readKey, 
(IntWritable)readValue);
-                                       tmpWt = ((IntWritable)readValue).get();
+                                       ret &= 
reader.readNextKeyValuePairs(readKey, readValue);
+                                       tmpWt = readValue.get();
                                }
                                
                                if((readWt<qLowerD && readWt+tmpWt >= qLowerD) 
|| (readWt+tmpWt <= qUpperD) || (readWt<qUpperD && readWt+tmpWt>=qUpperD)) {
                                        key.setIndexes(++index,1);
                                        value.setValue(readKey.get()*tmpWt);
                                        readWt += tmpWt;
-                                       return true;
+                                       return ret;
                                }
                                else {
                                        return false;
@@ -338,20 +329,19 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
                        }
                        else { 
                                // read full part
-                               reader.readNextKeyValuePairs(readKey, 
(IntWritable)readValue);
-                               tmpWt = ((IntWritable)readValue).get();
+                               boolean ret = 
reader.readNextKeyValuePairs(readKey, readValue);
+                               tmpWt = readValue.get();
                                
                                key.setIndexes(++index,1);
                                value.setValue(readKey.get()*tmpWt);
                                
                                readWt += tmpWt;
-                               return true;
+                               return ret;
                        }
                }
                
                @Override
                public void close() throws IOException {
-                       //DO Nothing
                        currentStream.close();
                }
 
@@ -373,10 +363,8 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
 
                @Override
                public float getProgress() throws IOException {
-                       if(numToRead>0)
-                               return 
(float)(rawKeyValuesRead-startPos)/(float)numToRead;
-                       else
-                               return 100.0f;
+                       float progress = (float) getPos() / totLength;
+                       return (progress>=0 && progress<=1) ? progress : 1.0f;
                }
        }
        
@@ -394,8 +382,8 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
                
                private int[] pos=null; //starting from 0
                private int[] indexes=null;
-               private DoubleWritable readKey=new DoubleWritable();
-               private Writable readValue;
+               private DoubleWritable readKey = new DoubleWritable();
+               private IntWritable readValue = new IntWritable();
                private int numRead=0;
                private boolean noRecordsNeeded=false;
                ReadWithZeros reader=null;
@@ -406,7 +394,6 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
                        return Integer.parseInt(name.substring(i+5));
                }
                
-               @SuppressWarnings("unchecked")
                public PickRecordReader(JobConf job, FileSplit split)
                        throws IOException
                {
@@ -429,12 +416,6 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
                        indexes[i]=Integer.parseInt(temp[1]);
                }
                
-               Class<? extends Writable> valueClass=(Class<? extends 
Writable>) job.getClass(VALUE_CLASS, Writable.class);
-               try {
-                       readValue=valueClass.newInstance();
-                       } catch (Exception e) {
-                               throw new RuntimeException(e);
-                       }
                        valueIsWeight=job.getBoolean(VALUE_IS_WEIGHT, true);
                        
                        int part0=job.getInt(PARTITION_OF_ZERO, -1);
@@ -456,9 +437,9 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
                
                        while(numRead<=pos[posIndex])
                        {
-                               reader.readNextKeyValuePairs(readKey, 
(IntWritable)readValue);
+                               reader.readNextKeyValuePairs(readKey, 
readValue);
                                if(valueIsWeight)
-                                       numRead+=((IntWritable)readValue).get();
+                                       numRead+=readValue.get();
                                else
                                        numRead++;
                        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/240143bb/src/main/java/org/apache/sysml/runtime/matrix/sort/ReadWithZeros.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/sort/ReadWithZeros.java 
b/src/main/java/org/apache/sysml/runtime/matrix/sort/ReadWithZeros.java
index dc578a8..8793be9 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/sort/ReadWithZeros.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/sort/ReadWithZeros.java
@@ -28,9 +28,7 @@ import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.IntWritable;
 
 public class ReadWithZeros 
-{
-
-       
+{      
        private boolean contain0s=false;
        private long numZeros=0;
        private FSDataInputStream currentStream;
@@ -46,8 +44,11 @@ public class ReadWithZeros
                numZeros=num0;
        }
        
-       public void readNextKeyValuePairs(DoubleWritable readKey, IntWritable 
readValue)throws IOException 
+       public boolean readNextKeyValuePairs(DoubleWritable readKey, 
IntWritable readValue)
+               throws IOException 
        {
+               boolean ret = true;
+               
                try {
                        if(contain0s && justFound0)
                        {
@@ -68,7 +69,7 @@ public class ReadWithZeros
                                readValue.set((int)numZeros);
                        }
                        else {
-                               throw e;
+                               ret = false;
                        }
                }
                
@@ -80,5 +81,7 @@ public class ReadWithZeros
                        readKey.set(0);
                        readValue.set((int)numZeros);
                }
+               
+               return ret;
        }
 }

Reply via email to