Repository: incubator-systemml
Updated Branches:
  refs/heads/master ce9392741 -> 7f8716b64


Fix mr iqm pick range record reader (current part weights), incl cleanup

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/2fa69cd6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/2fa69cd6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/2fa69cd6

Branch: refs/heads/master
Commit: 2fa69cd668e76e90baabfb2dcfdf7c549f6b3a2f
Parents: ce93927
Author: Matthias Boehm <[email protected]>
Authored: Sat Feb 13 17:46:58 2016 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Sat Feb 13 17:46:58 2016 -0800

----------------------------------------------------------------------
 .../matrix/sort/PickFromCompactInputFormat.java | 191 +++++++------------
 1 file changed, 72 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2fa69cd6/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
 
b/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
index 7e8aef0..d03afc8 100644
--- 
a/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
+++ 
b/src/main/java/org/apache/sysml/runtime/matrix/sort/PickFromCompactInputFormat.java
@@ -48,8 +48,7 @@ import org.apache.sysml.runtime.matrix.data.Pair;
 
 //key class to read has to be DoubleWritable
 public class PickFromCompactInputFormat extends FileInputFormat<MatrixIndexes, 
MatrixCell>
-{
-               
+{              
        public static final String VALUE_IS_WEIGHT="value.is.weight";
        public static final String INPUT_IS_VECTOR="input.is.vector";
        public static final String SELECTED_RANGES="selected.ranges";
@@ -191,11 +190,9 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
                                wt += counts[partID];
                }
                sb.append(partID+"," + (wt-counts[partID]) + ";"); 
-               
                sb.append(sumwt + "," + lbound + "," + ubound);
-               //System.out.println("range string: " + sb.toString());
+               
                job.set(SELECTED_RANGES, sb.toString());
-
                job.setBoolean(INPUT_IS_VECTOR, false);
        }
        
@@ -222,111 +219,89 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
        
        public static class RangePickRecordReader implements 
RecordReader<MatrixIndexes, MatrixCell>
        {
-               //private boolean valueIsWeight=true;
-               protected long totLength;
-               protected FileSystem fs;
-               protected Path path;
-               protected JobConf conf;
+               private Path path;
            
-           protected FSDataInputStream currentStream;
-               
+               private FSDataInputStream currentStream;
                private int startPos=0;
                private int numToRead=0;
-               DoubleWritable readKey=new DoubleWritable();
-               Writable readValue = new IntWritable(0);
+               private DoubleWritable readKey=new DoubleWritable();
+               private Writable readValue = new IntWritable(0);
                private boolean noRecordsNeeded=false;
                private int rawKeyValuesRead=0;
                private int index=0;
-               //private int currentRepeat=0;
+               private int beginPart=-1, endPart=-1, currPart=-1;
+               private double sumwt = 0.0, readWt;  // total weight (set in 
JobConf)
+               private double lbound, ubound;
+               private HashMap<Integer,Double> partWeights = null;
+               private boolean isFirstRecord = true;
+               private ReadWithZeros reader=null;
                
-               int beginPart=-1, endPart=-1, currPart=-1;
-               double sumwt = 0.0, readWt, wtUntilCurrPart;  // total weight 
(set in JobConf)
-               double lbound, ubound;
-               double[] partWeights = null;
-               boolean isFirstRecord = true;
-               
-               
-               //to handle zeros
-               ReadWithZeros reader=null;
-               /*private boolean contain0s=false;
-               private boolean justFound0=false;
-               private DoubleWritable keyAfterZero;
-               private Writable valueAfterZero; 
-               private long numZeros=0;*/
-               
-               private int getIndexInTheArray(String name)
-               {
-                       int i=name.indexOf("part-");
-                       assert(i>=0);
-                       return Integer.parseInt(name.substring(i+5));
-               }
-               
-               private void parseSelectedRangeString(String str) {
-                       String[] f1 = str.split(";");
-                       String[] f2 = null;
-                       
-                       // Each field of the form: "pid,wt" where wt is the 
total wt until the part pid
-                       partWeights = new double[f1.length-1];
-                       for(int i=0; i < f1.length-1; i++) {
-                               f2 = f1[i].split(",");
-                               if(i==0) {
-                                       beginPart = Integer.parseInt(f2[0]);
-                               }
-                               if (i==f1.length-2) {
-                                       endPart = Integer.parseInt(f2[0]);
-                               }
-                               partWeights[i] = Double.parseDouble(f2[1]);
-                       }
-                       
-                       // last field: "sumwt, lbound, ubound"
-                       f2 = f1[f1.length-1].split(",");
-                       sumwt  = Double.parseDouble(f2[0]);
-                       lbound = Double.parseDouble(f2[1]);
-                       ubound = Double.parseDouble(f2[2]);
-               }
-
                @SuppressWarnings("unchecked")
-               public RangePickRecordReader(JobConf job, FileSplit split) 
throws IOException {
+               public RangePickRecordReader(JobConf job, FileSplit split) 
+                       throws IOException 
+               {
                        parseSelectedRangeString(job.get(SELECTED_RANGES));
                        
                        // check if the current part file needs to be processed
                path = split.getPath();
-               totLength = split.getLength();
                currentStream = FileSystem.get(job).open(path);
                currPart = getIndexInTheArray(path.getName());
-               //System.out.println("RangePickRecordReader(): sumwt " + sumwt 
+ " currPart " + currPart + " partRange [" + beginPart + "," + endPart + "]");
                
                if ( currPart < beginPart || currPart > endPart ) {
-                       System.out.println("    currPart is out of range. 
Skipping part!");
                        noRecordsNeeded = true;
                        return;
                }
                
                Class<? extends Writable> valueClass=(Class<? extends 
Writable>) job.getClass(VALUE_CLASS, Writable.class);
                try {
-               //      readKey=keyClass.newInstance();
                                readValue=valueClass.newInstance();
                        } catch (Exception e) {
                                throw new RuntimeException(e);
                        }
-                       //valueIsWeight=job.getBoolean(VALUE_IS_WEIGHT, true);
                        
                        int part0=job.getInt(PARTITION_OF_ZERO, -1);
                        boolean contain0s=false;
                        long numZeros =0;
-               if(part0==currPart)
-               {
+               if(part0==currPart) {
                        contain0s = true;
                        numZeros = job.getLong(NUMBER_OF_ZERO, 0);
                }
                reader=new ReadWithZeros(currentStream, contain0s, numZeros);
                }
                
-               public boolean next(MatrixIndexes key, MatrixCell value) throws 
IOException {
-                       assert(currPart!=-1);
+               private int getIndexInTheArray(String name) {
+                       int i=name.indexOf("part-");
+                       return Integer.parseInt(name.substring(i+5));
+               }
+               
+               private void parseSelectedRangeString(String str) {
+                       String[] f1 = str.split(";");
+                       String[] f2 = null;
                        
+                       // Each field of the form: "pid,wt" where wt is the 
total wt until the part pid
+                       partWeights = new HashMap<Integer,Double>();
+                       for(int i=0; i < f1.length-1; i++) {
+                               f2 = f1[i].split(",");
+                               if(i==0) {
+                                       beginPart = Integer.parseInt(f2[0]);
+                               }
+                               if (i==f1.length-2) {
+                                       endPart = Integer.parseInt(f2[0]);
+                               }
+                               partWeights.put(i, Double.parseDouble(f2[1]));
+                       }
+                       
+                       // last field: "sumwt, lbound, ubound"
+                       f2 = f1[f1.length-1].split(",");
+                       sumwt  = Double.parseDouble(f2[0]);
+                       lbound = Double.parseDouble(f2[1]);
+                       ubound = Double.parseDouble(f2[2]);
+               }
+               
+               public boolean next(MatrixIndexes key, MatrixCell value) 
+                       throws IOException 
+               {       
                        if(noRecordsNeeded)
-                               // this part file does not fall within the 
required range of values
                                return false;
 
                        double qLowerD = sumwt*lbound; // lower quantile in 
double 
@@ -334,15 +309,14 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
                        
                        // weight until current part
                        if (isFirstRecord) {
-                               readWt = partWeights[currPart];
-                               wtUntilCurrPart = partWeights[currPart];
+                               if( !partWeights.containsKey(currPart) )
+                                       return false; //noRecordsNeeded
+                               readWt = partWeights.get(currPart);
                                isFirstRecord = false;
                        }
                        double tmpWt = 0;
                        
                        if ( currPart == beginPart || currPart == endPart ) {
-                               //readWt = partWeights[currPart];
-                               
                                reader.readNextKeyValuePairs(readKey, 
(IntWritable)readValue);
                                tmpWt = ((IntWritable)readValue).get();
 
@@ -356,14 +330,13 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
                                        key.setIndexes(++index,1);
                                        value.setValue(readKey.get()*tmpWt);
                                        readWt += tmpWt;
-                                       //System.out.println("currpart " + 
currPart + ", return (" + index +",1): " + readKey.get()*tmpWt + "| [" + 
readKey.get() + "," + tmpWt +"] readWt=" + readWt);
                                        return true;
                                }
                                else {
                                        return false;
                                }
                        }
-                       else { //if(currPart != beginPart && currPart != 
endPart) {
+                       else { 
                                // read full part
                                reader.readNextKeyValuePairs(readKey, 
(IntWritable)readValue);
                                tmpWt = ((IntWritable)readValue).get();
@@ -372,7 +345,6 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
                                value.setValue(readKey.get()*tmpWt);
                                
                                readWt += tmpWt;
-                               //System.out.println("currpart " + currPart + 
", return (" + index +",1): " + readKey.get()*tmpWt + "| [" + readKey.get() + 
"," + tmpWt +"] readWt=" + readWt);
                                return true;
                        }
                }
@@ -408,68 +380,58 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
                }
        }
        
+       /**
+        * 
+        */
        public static class PickRecordReader implements 
RecordReader<MatrixIndexes, MatrixCell>
        {
                private boolean valueIsWeight=true;
-               protected long totLength;
-               protected FileSystem fs;
-               protected Path path;
-               protected JobConf conf;
-           
-           protected FSDataInputStream currentStream;
+               private FileSystem fs;
+               private Path path;
+               
+           private FSDataInputStream currentStream;
                private int posIndex=0;
                
                private int[] pos=null; //starting from 0
                private int[] indexes=null;
-               DoubleWritable readKey=new DoubleWritable();
-               Writable readValue;
+               private DoubleWritable readKey=new DoubleWritable();
+               private Writable readValue;
                private int numRead=0;
                private boolean noRecordsNeeded=false;
-               
-               //to handle zeros
                ReadWithZeros reader=null;
-               /*private boolean contain0s=false;
-               private boolean justFound0=false;
-               private DoubleWritable keyAfterZero;
-               private Writable valueAfterZero; 
-               private long numZeros=0;*/
                
                private int getIndexInTheArray(String name)
                {
                        int i=name.indexOf("part-");
-                       assert(i>=0);
                        return Integer.parseInt(name.substring(i+5));
                }
                
                @SuppressWarnings("unchecked")
                public PickRecordReader(JobConf job, FileSplit split)
-                               throws IOException{
+                       throws IOException
+               {
                        fs = FileSystem.get(job);
                path = split.getPath();
-               totLength = split.getLength();
                currentStream = fs.open(path);
                int partIndex=getIndexInTheArray(path.getName());
                String arrStr=job.get(SELECTED_POINTS_PREFIX+partIndex);
-               //System.out.println("read back in the recordreader: "+arrStr);
-               if(arrStr==null || arrStr.isEmpty())
-               {
+               if(arrStr==null || arrStr.isEmpty()) {
                        noRecordsNeeded=true;
                        return;
                }
+               
                String[] strs=arrStr.split(",");
                pos=new int[strs.length];
                indexes=new int[strs.length];
-               for(int i=0; i<strs.length; i++)
-               {
+               for(int i=0; i<strs.length; i++) {
                        String[] temp=strs[i].split(":");
                        pos[i]=Integer.parseInt(temp[0]);
                        indexes[i]=Integer.parseInt(temp[1]);
                }
-           //  Class<? extends WritableComparable> keyClass=(Class<? extends 
WritableComparable>) job.getClass(KEY_CLASS, WritableComparable.class);
+               
                Class<? extends Writable> valueClass=(Class<? extends 
Writable>) job.getClass(VALUE_CLASS, Writable.class);
                try {
-               //      readKey=keyClass.newInstance();
-                               readValue=valueClass.newInstance();
+                       readValue=valueClass.newInstance();
                        } catch (Exception e) {
                                throw new RuntimeException(e);
                        }
@@ -478,8 +440,7 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
                        int part0=job.getInt(PARTITION_OF_ZERO, -1);
                        boolean contain0s=false;
                        long numZeros =0;
-               if(part0==partIndex)
-               {
+               if(part0==partIndex) {
                        contain0s = true;
                        numZeros = job.getLong(NUMBER_OF_ZERO, 0);
                }
@@ -487,21 +448,15 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
                }
                
                public boolean next(MatrixIndexes key, MatrixCell value)
-               throws IOException {
-                       
-                       if(noRecordsNeeded || posIndex>=pos.length)
-                       {
-                               //System.out.println("return false");
-                               
//System.out.println("noRecordsNeeded="+noRecordsNeeded+", 
currentStream.getPos()="+currentStream.getPos()
-                               //              +", totLength="+totLength+", 
posIndex="+posIndex+", pos.length="+pos.length);
+                       throws IOException 
+               {
+                       if(noRecordsNeeded || posIndex>=pos.length) {
                                return false;
                        }
                
-                       //System.out.println("numRead="+numRead+" 
pos["+posIndex+"]="+pos[posIndex]);
                        while(numRead<=pos[posIndex])
                        {
                                reader.readNextKeyValuePairs(readKey, 
(IntWritable)readValue);
-                       //      System.out.println("**** numRead "+numRead+" -- 
"+readKey+": "+readValue);
                                if(valueIsWeight)
                                        numRead+=((IntWritable)readValue).get();
                                else
@@ -511,14 +466,12 @@ public class PickFromCompactInputFormat extends 
FileInputFormat<MatrixIndexes, M
                        key.setIndexes(indexes[posIndex], 1);
                        value.setValue(readKey.get());
                        posIndex++;
-                       //System.out.println("next: "+key+", "+value);
                        return true;
                }
 
                @Override
                public void close() throws IOException {
-                       currentStream.close();
-                       
+                       currentStream.close();                  
                }
 
                @Override

Reply via email to