Repository: systemml
Updated Branches:
  refs/heads/master 50b3c80c6 -> 1f3dfbc19


[SYSTEMML-1941] Fix spark quantile operations w/ non-constant weights

Recently added tests for spark quantile operations (quantile, median,
IQM) revealed issues of incorrect results and other failures. This was
because the respective spark instructions only took the weights into
account for computing the inter-quartile sum but not to find the actual
quantiles. This did not show up before, because most tests only used
constant weights of 1 in order to compare against R, which does not
provide these weighted statistics. 

This patch added the respective tests, and essentially reimplements all
spark quantile operations for quantile, median, and IQM via a new
primitive. The core approach is to compute the sum of weights per
partition, sequentially compute the cumulative sum at the driver to
determine the quantile partitions, and make a final pass to obtain the
actual values as well as partial value proportions. This requires not
just a sorting of values but also a sorted order of partitions and
blocks within each partition. Finally, we now reuse the computation of
corrected IQM values across the CP and spark backends.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/1f3dfbc1
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/1f3dfbc1
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/1f3dfbc1

Branch: refs/heads/master
Commit: 1f3dfbc190320b353461035aa517861f958837ed
Parents: 50b3c80
Author: Matthias Boehm <[email protected]>
Authored: Mon Oct 2 23:21:39 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Tue Oct 3 19:47:06 2017 -0700

----------------------------------------------------------------------
 .../spark/QuantilePickSPInstruction.java        | 265 +++++++++++++------
 .../sysml/runtime/matrix/data/MatrixBlock.java  | 101 ++-----
 .../functions/binary/matrix/QuantileTest.java   | 102 +++----
 .../functions/ternary/QuantileWeightsTest.java  | 114 +++-----
 .../functions/unary/matrix/IQMTest.java         |  81 ++++--
 5 files changed, 334 insertions(+), 329 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/1f3dfbc1/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java
index 3fc12ea..d7fb6e4 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java
@@ -19,10 +19,16 @@
 
 package org.apache.sysml.runtime.instructions.spark;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFunction;
 
 import scala.Tuple2;
@@ -54,9 +60,7 @@ public class QuantilePickSPInstruction extends 
BinarySPInstruction {
                        boolean inmem, String opcode, String istr) {
                super(op, in, in2, out, opcode, istr);
                _sptype = SPINSTRUCTION_TYPE.QPick;
-
                _type = type;
-               // inmem ignored here
        }
 
        public static QuantilePickSPInstruction parseInstruction ( String str ) 
@@ -71,27 +75,22 @@ public class QuantilePickSPInstruction extends 
BinarySPInstruction {
                }
                
                //instruction parsing
-               if( parts.length == 4 )
-               {
+               if( parts.length == 4 ) {
                        //instructions of length 4 originate from unary - mr-iqm
-                       //TODO this should be refactored to use pickvaluecount 
lops
                        CPOperand in1 = new CPOperand(parts[1]);
                        CPOperand in2 = new CPOperand(parts[2]);
                        CPOperand out = new CPOperand(parts[3]);
                        OperationTypes ptype = OperationTypes.IQM;
-                       boolean inmem = false;
-                       return new QuantilePickSPInstruction(null, in1, in2, 
out, ptype, inmem, opcode, str);                   
+                       return new QuantilePickSPInstruction(null, in1, in2, 
out, ptype, false, opcode, str);
                }
-               else if( parts.length == 5 )
-               {
+               else if( parts.length == 5 ) {
                        CPOperand in1 = new CPOperand(parts[1]);
                        CPOperand out = new CPOperand(parts[2]);
                        OperationTypes ptype = OperationTypes.valueOf(parts[3]);
                        boolean inmem = Boolean.parseBoolean(parts[4]);
                        return new QuantilePickSPInstruction(null, in1, out, 
ptype, inmem, opcode, str);
                }
-               else if( parts.length == 6 )
-               {
+               else if( parts.length == 6 ) {
                        CPOperand in1 = new CPOperand(parts[1]);
                        CPOperand in2 = new CPOperand(parts[2]);
                        CPOperand out = new CPOperand(parts[3]);
@@ -109,51 +108,37 @@ public class QuantilePickSPInstruction extends 
BinarySPInstruction {
        {
                SparkExecutionContext sec = (SparkExecutionContext)ec;
                
-               MatrixCharacteristics mc = 
sec.getMatrixCharacteristics(input1.getName());
-               boolean weighted = (mc.getCols()==2);
-               
                //get input rdds
                JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
sec.getBinaryBlockRDDHandleForVariable( input1.getName() );
+               MatrixCharacteristics mc = 
sec.getMatrixCharacteristics(input1.getName());
                
                //NOTE: no difference between inmem/mr pick (see related cp 
instruction), but wrt w/ w/o weights
                //(in contrast to cp instructions, w/o weights does not 
materializes weights of 1)
-               switch( _type ) 
-               {
+               switch( _type ) {
                        case VALUEPICK: {
-                               double sum_wt = weighted ? sumWeights(in) : 
mc.getRows();
-                               ScalarObject quantile = 
ec.getScalarInput(input2.getName(), input2.getValueType(), input2.isLiteral());
-                               long key = 
(long)Math.ceil(quantile.getDoubleValue()*sum_wt);
-                               double val = lookupKey(in, key, 
mc.getRowsPerBlock());
-                               ec.setScalarOutput(output.getName(), new 
DoubleObject(val));
-                               
+                               ScalarObject quantile = 
ec.getScalarInput(input2);
+                               double[] wt = getWeightedQuantileSummary(in, 
mc, quantile.getDoubleValue());
+                               ec.setScalarOutput(output.getName(), new 
DoubleObject(wt[3]));
                                break;
                        }
                        
                        case MEDIAN: {
-                               double sum_wt = weighted ? sumWeights(in) : 
mc.getRows();
-                               long key = (long)Math.ceil(0.5*sum_wt);
-                               double val = lookupKey(in, key, 
mc.getRowsPerBlock());
-                               ec.setScalarOutput(output.getName(), new 
DoubleObject(val));
-                                                               
+                               double[] wt = getWeightedQuantileSummary(in, 
mc, 0.5);
+                               ec.setScalarOutput(output.getName(), new 
DoubleObject(wt[3]));
                                break;
                        }
                        
                        case IQM: {
-                               double sum_wt = weighted ? sumWeights(in) : 
mc.getRows();
-                               long key25 = (long)Math.ceil(0.25*sum_wt);
-                               long key75 = (long)Math.ceil(0.75*sum_wt);      
-                               double val25 = lookupKey(in, key25, 
mc.getRowsPerBlock());
-                               double val75 = lookupKey(in, key75, 
mc.getRowsPerBlock());
+                               double[] wt = getWeightedQuantileSummary(in, 
mc, 0.25, 0.75);
+                               long key25 = (long)Math.ceil(wt[1]);
+                               long key75 = (long)Math.ceil(wt[2]);
                                JavaPairRDD<MatrixIndexes,MatrixBlock> out = in
-                                               .filter(new 
FilterFunction(key25+1,key75,mc.getRowsPerBlock()))
-                                               .mapToPair(new 
ExtractAndSumFunction(key25+1, key75, mc.getRowsPerBlock()));
-                               MatrixBlock mb = 
RDDAggregateUtils.sumStable(out);                                      
-                               double val = (mb.getValue(0, 0) 
-                                            + (key25-0.25*sum_wt)*val25
-                                            - (key75-0.75*sum_wt)*val75)
-                                            / (0.5*sum_wt);
+                                       .filter(new 
FilterFunction(key25+1,key75,mc.getRowsPerBlock()))
+                                       .mapToPair(new 
ExtractAndSumFunction(key25+1, key75, mc.getRowsPerBlock()));
+                               double sum = 
RDDAggregateUtils.sumStable(out).getValue(0, 0);
+                               double val = MatrixBlock.computeIQMCorrection(
+                                       sum, wt[0], wt[3], wt[5], wt[4], wt[6]);
                                ec.setScalarOutput(output.getName(), new 
DoubleObject(val));
-                               
                                break;
                        }
                
@@ -161,25 +146,92 @@ public class QuantilePickSPInstruction extends 
BinarySPInstruction {
                                throw new DMLRuntimeException("Unsupported 
qpick operation type: "+_type);
                }
        }
+       
+       /**
+        * Get a summary of weighted quantiles in in the following form:
+        * sum of weights, (keys of quantiles), (portions of quantiles), 
(values of quantiles)
+        * 
+        * @param w rdd containing values and optionally weights, sorted by 
value
+        * @param mc matrix characteristics
+        * @param quantiles one or more quantiles between 0 and 1.
+        * @return a summary of weighted quantiles
+        * @throws DMLRuntimeException 
+        */
+       private double[] 
getWeightedQuantileSummary(JavaPairRDD<MatrixIndexes,MatrixBlock> w, 
MatrixCharacteristics mc, Double... quantiles)
+               throws DMLRuntimeException 
+       {
+               double[] ret = new double[3*quantiles.length + 1];
+               if( mc.getCols()==2 ) //weighted 
+               {
+                       //sort blocks (values sorted but blocks and partitions 
are not)
+                       w = w.sortByKey();
+                       
+                       //compute cumsum weights per partition
+                       //with assumption that partition aggregates fit into 
memory
+                       List<Tuple2<Integer,Double>> partWeights = w
+                               .mapPartitionsWithIndex(new 
SumWeightsFunction(), false).collect();
+                       
+                       //compute sum of weights
+                       ret[0] = partWeights.stream().mapToDouble(p -> 
p._2()).sum();
+                       
+                       //compute total cumsum and determine partitions
+                       double[] qdKeys = new double[quantiles.length];
+                       long[] qiKeys = new long[quantiles.length];
+                       int[] partitionIDs = new int[quantiles.length];
+                       double[] offsets = new double[quantiles.length];
+                       for( int i=0; i<quantiles.length; i++ ) {
+                               qdKeys[i] = quantiles[i]*ret[0];
+                               qiKeys[i] = (long)Math.ceil(qdKeys[i]);
+                       }
+                       double cumSum = 0;
+                       for( Tuple2<Integer,Double> psum : partWeights ) {
+                               double tmp = cumSum + psum._2();
+                               for(int i=0; i<quantiles.length; i++)
+                                       if( tmp >= qiKeys[i] && partitionIDs[i] 
== 0 ) {
+                                               partitionIDs[i] = psum._1();
+                                               offsets[i] = cumSum;
+                                       }
+                               cumSum = tmp;
+                       }
+                       
+                       //get keys and values for quantile cutoffs 
+                       List<Tuple2<Integer,double[]>> qVals = w
+                               .mapPartitionsWithIndex(new 
ExtractWeightedQuantileFunction(
+                                       mc, qdKeys, qiKeys, partitionIDs, 
offsets), false).collect();
+                       for( Tuple2<Integer,double[]> qVal : qVals ) {
+                               ret[qVal._1()+1] = qVal._2()[0];
+                               ret[qVal._1()+quantiles.length+1] = 
qVal._2()[1];
+                               ret[qVal._1()+2*quantiles.length+1] = 
qVal._2()[2];
+                       }
+               }
+               else {
+                       ret[0] = mc.getRows();
+                       for( int i=0; i<quantiles.length; i++ ){
+                               ret[i+1] = quantiles[i] * mc.getRows();
+                               ret[i+quantiles.length+1] = 
Math.ceil(ret[i+1])-ret[i+1];
+                               ret[i+2*quantiles.length+1] = lookupKey(w, 
+                                       (long)Math.ceil(ret[i+1]), 
mc.getRowsPerBlock());
+                       }
+               }
+               
+               return ret;
+       }
 
        private double lookupKey(JavaPairRDD<MatrixIndexes,MatrixBlock> in, 
long key, int brlen)
+               throws DMLRuntimeException
        {
                long rix = UtilFunctions.computeBlockIndex(key, brlen);
-               long pos = UtilFunctions.computeCellInBlock(key, brlen);        
        
-                               
+               long pos = UtilFunctions.computeCellInBlock(key, brlen);
                List<MatrixBlock> val = in.lookup(new MatrixIndexes(rix,1));
+               if( val.isEmpty() )
+                       throw new DMLRuntimeException("Invalid key lookup in 
empty list.");
+               MatrixBlock tmp = val.get(0);
+               if( tmp.getNumRows() <= pos )
+                       throw new DMLRuntimeException("Invalid key lookup for " 
+
+                               pos + " in block of size " + 
tmp.getNumRows()+"x"+tmp.getNumColumns());
                return val.get(0).quickGetValue((int)pos, 0);
        }
-
-       private double sumWeights(JavaPairRDD<MatrixIndexes,MatrixBlock> in)
-       {
-               JavaPairRDD<MatrixIndexes,MatrixBlock> tmp = in
-                               .mapValues(new ExtractAndSumWeightsFunction());
-               MatrixBlock val = RDDAggregateUtils.sumStable(tmp);
-               
-               return val.quickGetValue(0, 0);
-       }
-
+       
        private static class FilterFunction implements 
Function<Tuple2<MatrixIndexes,MatrixBlock>, Boolean> 
        {
                private static final long serialVersionUID = 
-8249102381116157388L;
@@ -188,8 +240,7 @@ public class QuantilePickSPInstruction extends 
BinarySPInstruction {
                private long _minRowIndex;
                private long _maxRowIndex;
                
-               public FilterFunction(long key25, long key75, int brlen)
-               {
+               public FilterFunction(long key25, long key75, int brlen) {
                        _minRowIndex = UtilFunctions.computeBlockIndex(key25, 
brlen);
                        _maxRowIndex = UtilFunctions.computeBlockIndex(key75, 
brlen);
                }
@@ -227,41 +278,101 @@ public class QuantilePickSPInstruction extends 
BinarySPInstruction {
                {
                        MatrixIndexes ix = arg0._1();
                        MatrixBlock mb = arg0._2();
-                       
-                       if( _minRowIndex==_maxRowIndex ){
-                               mb = mb.sliceOperations(_minPos-1, _maxPos-1, 
0, 0, new MatrixBlock());
-                       }
-                       else if( ix.getRowIndex() == _minRowIndex ) {
-                               mb = mb.sliceOperations(_minPos, 
mb.getNumRows()-1, 0, 0, new MatrixBlock());   
-                       }
-                       else if( ix.getRowIndex() == _maxRowIndex ) {
-                               mb = mb.sliceOperations(0, _maxPos, 0, 0, new 
MatrixBlock());   
-                       }
-                       
-                       //create output (with correction)
+                       int rl = (ix.getRowIndex() == _minRowIndex) ? _minPos : 
0;
+                       int ru = (ix.getRowIndex() == _maxRowIndex) ? _maxPos+1 
: mb.getNumRows();
                        MatrixBlock ret = new MatrixBlock(1,2,false);
-                       ret.setValue(0, 0, mb.sum());
-                       
-                       return new Tuple2<MatrixIndexes,MatrixBlock>(new 
MatrixIndexes(1,1), ret);
+                       ret.setValue(0, 0, (mb.getNumColumns()==1) ? 
+                               sum(mb, rl, ru) : sumWeighted(mb, rl, ru));
+                       return new Tuple2<>(new MatrixIndexes(1,1), ret);
                }
+               
+               private static double sum(MatrixBlock mb, int rl, int ru) {
+                       double sum = 0;
+                       for(int i=rl; i<ru; i++)
+                               sum += mb.quickGetValue(i, 0);
+                       return sum;
+               }
+               
+               private static double sumWeighted(MatrixBlock mb, int rl, int 
ru) {
+                       double sum = 0;
+                       for(int i=rl; i<ru; i++)
+                               sum += mb.quickGetValue(i, 0)
+                                       * mb.quickGetValue(i, 1);
+                       return sum;
+               }
        }
 
-       private static class ExtractAndSumWeightsFunction implements 
Function<MatrixBlock,MatrixBlock> 
+       private static class SumWeightsFunction implements 
Function2<Integer,Iterator<Tuple2<MatrixIndexes,MatrixBlock>>,Iterator<Tuple2<Integer,
 Double>>> 
        {
                private static final long serialVersionUID = 
7169831202450745373L;
 
                @Override
-               public MatrixBlock call(MatrixBlock arg0) 
+               public Iterator<Tuple2<Integer, Double>> call(Integer v1, 
Iterator<Tuple2<MatrixIndexes, MatrixBlock>> v2)
                        throws Exception 
                {
-                       //slice operation (2nd column)
-                        MatrixBlock mb = arg0.sliceOperations(0, 
arg0.getNumRows()-1, 1, 1, new MatrixBlock());
+                       //aggregate partition weights (in sorted order)
+                       double sum = 0;
+                       while( v2.hasNext() )
+                               sum += v2.next()._2().sumWeightForQuantile();
                        
-                       //create output (with correction)
-                       MatrixBlock ret = new MatrixBlock(1,2,false);
-                       ret.setValue(0, 0, mb.sum());
+                       //return tuple for partition aggregate
+                       return Arrays.asList(new Tuple2<>(v1,sum)).iterator();
+               }
+       }
+       
+       private static class ExtractWeightedQuantileFunction implements 
Function2<Integer,Iterator<Tuple2<MatrixIndexes,MatrixBlock>>,Iterator<Tuple2<Integer,
 double[]>>> 
+       {
+               private static final long serialVersionUID = 
4879975971050093739L;
+               private final MatrixCharacteristics _mc;
+               private final double[] _qdKeys;
+               private final long[] _qiKeys;
+               private final int[] _qPIDs;
+               private final double[] _offsets;
+               
+               public ExtractWeightedQuantileFunction(MatrixCharacteristics 
mc, double[] qdKeys, long[] qiKeys, int[] qPIDs, double[] offsets) {
+                       _mc = mc;
+                       _qdKeys = qdKeys;
+                       _qiKeys = qiKeys;
+                       _qPIDs = qPIDs;
+                       _offsets = offsets;
+               }
+
+               @Override
+               public Iterator<Tuple2<Integer, double[]>> call(Integer v1, 
Iterator<Tuple2<MatrixIndexes, MatrixBlock>> v2) 
+                       throws Exception 
+               {
+                       //early abort for unnecessary partitions
+                       if( !ArrayUtils.contains(_qPIDs, v1) )
+                               return Collections.emptyIterator();
+                       
+                       //determine which quantiles are active
+                       int qlen = (int)Arrays.stream(_qPIDs).filter(i -> 
i==v1).count();
+                       int[] qix = new int[qlen];
+                       for(int i=0, pos=0; i<_qPIDs.length; i++)
+                               if( _qPIDs[i]==v1 )
+                                       qix[pos++] = i;
+                       double offset = _offsets[qix[0]];
                        
-                       return ret;
+                       //iterate over blocks and determine quantile positions
+                       ArrayList<Tuple2<Integer,double[]>> ret = new 
ArrayList<>();
+                       while( v2.hasNext() ) {
+                               Tuple2<MatrixIndexes, MatrixBlock> tmp = 
v2.next();
+                               MatrixIndexes ix = tmp._1();
+                               MatrixBlock mb = tmp._2();
+                               for( int i=0; i<mb.getNumRows(); i++ ) {
+                                       double val = mb.quickGetValue(i, 1);
+                                       for( int j=0; j<qlen; j++ ) {
+                                               if( offset+val >= 
_qiKeys[qix[j]] ) {
+                                                       long pos = 
UtilFunctions.computeCellIndex(ix.getRowIndex(), _mc.getRowsPerBlock(), i);
+                                                       double posPart = 
offset+val - _qdKeys[qix[j]];
+                                                       ret.add(new 
Tuple2<>(qix[j], new double[]{pos, posPart, mb.quickGetValue(i, 0)}));
+                                                       _qiKeys[qix[j]] = 
Long.MAX_VALUE;
+                                               }
+                                       }
+                                       offset += val;
+                               }
+                       }
+                       return ret.iterator();
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/1f3dfbc1/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index ef32481..5b1e663 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -4680,92 +4680,39 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
        }
        
        public double interQuartileMean() throws DMLRuntimeException {
-               
+               //input state: rlen x 2, values and weights, sorted by weight
+               //approach: determine q25 and q75 keys by cumsum of weights
                double sum_wt = sumWeightForQuantile();
-               
                double q25d = 0.25*sum_wt;
                double q75d = 0.75*sum_wt;
-               
                int q25i = (int) Math.ceil(q25d);
                int q75i = (int) Math.ceil(q75d);
                
-               // skip until (but excluding) q25
-               int t = 0, i=-1;
-               while(i<getNumRows() && t < q25i) {
-                       i++;
-                       //System.out.println("    " + i + ": " + 
quickGetValue(i,0) + "," + quickGetValue(i,1));
-                       t += quickGetValue(i,1);
-               }
-               // compute the portion of q25
-               double runningSum = (t-q25d)*quickGetValue(i,0);
+               // find q25 as sum of weights (but excluding from mean)
+               double psum = 0; int i = -1;
+               while(psum < q25i && i < getNumRows())
+                       psum += quickGetValue(++i, 1);
+               double q25Part = psum-q25d; 
+               double q25Val = quickGetValue(i, 0);
                
-               // add until (including) q75
-               while(i<getNumRows() && t < q75i) {
-                       i++;
-                       t += quickGetValue(i,1);
-                       runningSum += quickGetValue(i,0)*quickGetValue(i,1);
+               // compute mean and find q75 as sum of weights (including in 
mean)
+               double sum = 0;
+               while(psum < q75i && i < getNumRows()) {
+                       double v1 = quickGetValue(++i, 0);
+                       double v2 = quickGetValue(i, 1);
+                       psum += v2;
+                       sum += v1 * v2;
                }
-               // subtract additional portion of q75
-               runningSum -= (t-q75d)*quickGetValue(i,0);
+               double q75Part = psum-q75d;
+               double q75Val = quickGetValue(i, 0);
                
-               return runningSum/(sum_wt*0.5);
+               //compute final IQM, incl. correction for q25 and q75 portions 
+               return computeIQMCorrection(sum, sum_wt, q25Part, q25Val, 
q75Part, q75Val);
        }
        
-       /**
-        * Computes the weighted interQuartileMean.
-        * The matrix block ("this" pointer) has two columns, in which the 
first column 
-        * refers to the data and second column denotes corresponding weights.
-        * 
-        * @return InterQuartileMean
-        * @throws DMLRuntimeException if DMLRuntimeException occurs
-        */
-       public double interQuartileMeanOLD() throws DMLRuntimeException {
-               double sum_wt = sumWeightForQuantile();
-               
-               int fromPos = (int) Math.ceil(0.25*sum_wt);
-               int toPos = (int) Math.ceil(0.75*sum_wt);
-               int selectRange = toPos-fromPos; // range: (fromPos,toPos]
-               
-               if ( selectRange == 0 )
-                       return 0.0;
-               
-               int index, count=0;
-               
-               // The first row (0^th row) has value 0.
-               // If it has a non-zero weight i.e., input data has zero values
-               // then "index" must start from 0, otherwise we skip the first 
row 
-               // and start with the next value in the data, which is in the 
1st row.
-               if ( quickGetValue(0,1) > 0 ) 
-                       index = 0;
-               else
-                       index = 1;
-               
-               // keep scanning the weights, until we hit the required 
position <code>fromPos</code>
-               while ( count < fromPos ) {
-                       count += quickGetValue(index,1);
-                       index++;
-               }
-               
-               double runningSum; 
-               double val;
-               int wt, selectedCount;
-               
-               runningSum = (count-fromPos) * quickGetValue(index-1, 0);
-               selectedCount = (count-fromPos);
-               
-               while(count <= toPos ) {
-                       val = quickGetValue(index,0);
-                       wt = (int) quickGetValue(index,1);
-                       
-                       runningSum += (val * Math.min(wt, 
selectRange-selectedCount));
-                       selectedCount += Math.min(wt, 
selectRange-selectedCount);
-                       count += wt;
-                       index++;
-               }
-               
-               //System.out.println(fromPos + ", " + toPos + ": " + count + ", 
"+ runningSum + ", " + selectedCount);
-               
-               return runningSum/selectedCount;
+       public static double computeIQMCorrection(double sum, double sum_wt, 
+               double q25Part, double q25Val, double q75Part, double q75Val) {
+               return (sum + q25Part*q25Val - q75Part*q75Val) / (sum_wt*0.5); 
        }
        
        public MatrixValue pickValues(MatrixValue quantiles, MatrixValue ret) 
@@ -4850,13 +4797,13 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
         * @return sum weight for quantile
         * @throws DMLRuntimeException
         */
-       private double sumWeightForQuantile() 
+       public double sumWeightForQuantile() 
                throws DMLRuntimeException 
        {
                double sum_wt = 0;
                for (int i=0; i < getNumRows(); i++ ) {
                        double tmp = quickGetValue(i, 1);
-                       sum_wt += tmp;          
+                       sum_wt += tmp;
                        
                        // test all values not just final sum_wt to ensure that 
non-integer weights
                        // don't cancel each other out; integer weights are 
required by all quantiles, etc

http://git-wip-us.apache.org/repos/asf/systemml/blob/1f3dfbc1/src/test/java/org/apache/sysml/test/integration/functions/binary/matrix/QuantileTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/binary/matrix/QuantileTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/binary/matrix/QuantileTest.java
index acdde70..bd470e7 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/binary/matrix/QuantileTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/binary/matrix/QuantileTest.java
@@ -59,195 +59,159 @@ public class QuantileTest extends AutomatedTestBase
                addTestConfiguration(TEST_NAME2, 
                        new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new 
String[] { "R" }) ); 
                addTestConfiguration(TEST_NAME3, 
-                       new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new 
String[] { "R" }) );              
+                       new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new 
String[] { "R" }) );
        }
        
        @Test
-       public void testQuantile1DenseCP() 
-       {
+       public void testQuantile1DenseCP() {
                runQuantileTest(TEST_NAME1, 0.25, false, ExecType.CP);
        }
        
        @Test
-       public void testQuantile2DenseCP() 
-       {
+       public void testQuantile2DenseCP() {
                runQuantileTest(TEST_NAME1, 0.50, false, ExecType.CP);
        }
        
        @Test
-       public void testQuantile3DenseCP() 
-       {
+       public void testQuantile3DenseCP() {
                runQuantileTest(TEST_NAME1, 0.75, false, ExecType.CP);
        }
        
        @Test
-       public void testQuantile1SparseCP() 
-       {
+       public void testQuantile1SparseCP() {
                runQuantileTest(TEST_NAME1, 0.25, true, ExecType.CP);
        }
        
        @Test
-       public void testQuantile2SparseCP() 
-       {
+       public void testQuantile2SparseCP() {
                runQuantileTest(TEST_NAME1, 0.50, true, ExecType.CP);
        }
        
        @Test
-       public void testQuantile3SparseCP() 
-       {
+       public void testQuantile3SparseCP() {
                runQuantileTest(TEST_NAME1, 0.75, true, ExecType.CP);
        }
        
        @Test
-       public void testQuantile1DenseMR() 
-       {
+       public void testQuantile1DenseMR() {
                runQuantileTest(TEST_NAME1, 0.25, false, ExecType.MR);
        }
        
        @Test
-       public void testQuantile2DenseMR() 
-       {
+       public void testQuantile2DenseMR() {
                runQuantileTest(TEST_NAME1, 0.50, false, ExecType.MR);
        }
        
        @Test
-       public void testQuantile3DenseMR() 
-       {
+       public void testQuantile3DenseMR() {
                runQuantileTest(TEST_NAME1, 0.75, false, ExecType.MR);
        }
        
        @Test
-       public void testQuantile1SparseMR() 
-       {
+       public void testQuantile1SparseMR() {
                runQuantileTest(TEST_NAME1, 0.25, true, ExecType.MR);
        }
        
        @Test
-       public void testQuantile2SparseMR() 
-       {
+       public void testQuantile2SparseMR() {
                runQuantileTest(TEST_NAME1, 0.50, true, ExecType.MR);
        }
        
        @Test
-       public void testQuantile3SparseMR() 
-       {
+       public void testQuantile3SparseMR() {
                runQuantileTest(TEST_NAME1, 0.75, true, ExecType.MR);
        }
 
        @Test
-       public void testQuantile1DenseSP() 
-       {
+       public void testQuantile1DenseSP() {
                runQuantileTest(TEST_NAME1, 0.25, false, ExecType.SPARK);
        }
        
        @Test
-       public void testQuantile2DenseSP() 
-       {
+       public void testQuantile2DenseSP() {
                runQuantileTest(TEST_NAME1, 0.50, false, ExecType.SPARK);
        }
        
        @Test
-       public void testQuantile3DenseSP() 
-       {
+       public void testQuantile3DenseSP() {
                runQuantileTest(TEST_NAME1, 0.75, false, ExecType.SPARK);
        }
        
        @Test
-       public void testQuantile1SparseSP() 
-       {
+       public void testQuantile1SparseSP() {
                runQuantileTest(TEST_NAME1, 0.25, true, ExecType.SPARK);
        }
        
        @Test
-       public void testQuantile2SparseSP() 
-       {
+       public void testQuantile2SparseSP() {
                runQuantileTest(TEST_NAME1, 0.50, true, ExecType.SPARK);
        }
        
        @Test
-       public void testQuantile3SparseSP() 
-       {
+       public void testQuantile3SparseSP() {
                runQuantileTest(TEST_NAME1, 0.75, true, ExecType.SPARK);
        }
 
        @Test
-       public void testMedianDenseCP() 
-       {
+       public void testMedianDenseCP() {
                runQuantileTest(TEST_NAME2, -1, false, ExecType.CP);
        }
        
        @Test
-       public void testMedianSparseCP() 
-       {
+       public void testMedianSparseCP() {
                runQuantileTest(TEST_NAME2, -1, true, ExecType.CP);
        }
        
        @Test
-       public void testMedianDenseMR() 
-       {
+       public void testMedianDenseMR() {
                runQuantileTest(TEST_NAME2, -1, false, ExecType.MR);
        }
        
        @Test
-       public void testMedianSparseMR() 
-       {
+       public void testMedianSparseMR() {
                runQuantileTest(TEST_NAME2, -1, true, ExecType.MR);
        }
        
        @Test
-       public void testMedianDenseSP() 
-       {
+       public void testMedianDenseSP() {
                runQuantileTest(TEST_NAME2, -1, false, ExecType.SPARK);
        }
 
        @Test
-       public void testMedianSparseSP() 
-       {
+       public void testMedianSparseSP() {
                runQuantileTest(TEST_NAME2, -1, true, ExecType.SPARK);
        }
 
        @Test
-       public void testIQMDenseCP() 
-       {
+       public void testIQMDenseCP() {
                runQuantileTest(TEST_NAME3, -1, false, ExecType.CP);
        }
        
        @Test
-       public void testIQMSparseCP() 
-       {
+       public void testIQMSparseCP() {
                runQuantileTest(TEST_NAME3, -1, true, ExecType.CP);
        }
        
        @Test
-       public void testIQMDenseMR() 
-       {
+       public void testIQMDenseMR() {
                runQuantileTest(TEST_NAME3, -1, false, ExecType.MR);
        }
        
        @Test
-       public void testIQMSparseMR() 
-       {
+       public void testIQMSparseMR() {
                runQuantileTest(TEST_NAME3, -1, true, ExecType.MR);
        }
        
        @Test
-       public void testIQMDenseSP() 
-       {
+       public void testIQMDenseSP() {
                runQuantileTest(TEST_NAME3, -1, false, ExecType.SPARK);
        }
 
        @Test
-       public void testIQMSparseSP() 
-       {
+       public void testIQMSparseSP() {
                runQuantileTest(TEST_NAME3, -1, true, ExecType.SPARK);
        }
        
-       /**
-        * 
-        * @param sparseM1
-        * @param sparseM2
-        * @param instType
-        */
        private void runQuantileTest( String TEST_NAME, double p, boolean 
sparse, ExecType et)
        {
                //rtplatform for MR
@@ -269,7 +233,6 @@ public class QuantileTest extends AutomatedTestBase
                        String HOME = SCRIPT_DIR + TEST_DIR;
                        fullDMLScriptName = HOME + TEST_NAME + ".dml";
                        programArgs = new String[]{"-args", input("A"), 
Double.toString(p), output("R")};
-                       
                        fullRScriptName = HOME + TEST_NAME + ".R";
                        rCmd = "Rscript" + " " + fullRScriptName + " " + 
inputDir() + " " + p + " "+ expectedDir();
        
@@ -286,8 +249,7 @@ public class QuantileTest extends AutomatedTestBase
                        HashMap<CellIndex, Double> rfile  = 
readRMatrixFromFS("R");
                        TestUtils.compareMatrices(dmlfile, rfile, eps, 
"Stat-DML", "Stat-R");
                }
-               finally
-               {
+               finally {
                        rtplatform = platformOld;
                        DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
                }

http://git-wip-us.apache.org/repos/asf/systemml/blob/1f3dfbc1/src/test/java/org/apache/sysml/test/integration/functions/ternary/QuantileWeightsTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/ternary/QuantileWeightsTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/ternary/QuantileWeightsTest.java
index 2747a3f..44b4282 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/ternary/QuantileWeightsTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/ternary/QuantileWeightsTest.java
@@ -31,12 +31,8 @@ import org.apache.sysml.test.integration.AutomatedTestBase;
 import org.apache.sysml.test.integration.TestConfiguration;
 import org.apache.sysml.test.utils.TestUtils;
 
-/**
- * 
- */
 public class QuantileWeightsTest extends AutomatedTestBase 
 {
-       
        private final static String TEST_NAME1 = "QuantileWeights";
        private final static String TEST_NAME2 = "MedianWeights";
        private final static String TEST_NAME3 = "IQMWeights";
@@ -51,8 +47,7 @@ public class QuantileWeightsTest extends AutomatedTestBase
        private final static double sparsity2 = 0.3;
        
        @Override
-       public void setUp() 
-       {
+       public void setUp() {
                TestUtils.clearAssertionInformation();
                addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "R" }) );
                addTestConfiguration(TEST_NAME2, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] { "R" }) );
@@ -60,191 +55,155 @@ public class QuantileWeightsTest extends AutomatedTestBase
        }
        
        @Test
-       public void testQuantile1DenseCP() 
-       {
+       public void testQuantile1DenseCP() {
                runQuantileTest(TEST_NAME1, 0.25, false, ExecType.CP);
        }
        
        @Test
-       public void testQuantile2DenseCP() 
-       {
+       public void testQuantile2DenseCP() {
                runQuantileTest(TEST_NAME1, 0.50, false, ExecType.CP);
        }
        
        @Test
-       public void testQuantile3DenseCP() 
-       {
+       public void testQuantile3DenseCP() {
                runQuantileTest(TEST_NAME1, 0.75, false, ExecType.CP);
        }
        
        @Test
-       public void testQuantile1SparseCP() 
-       {
+       public void testQuantile1SparseCP() {
                runQuantileTest(TEST_NAME1, 0.25, true, ExecType.CP);
        }
        
        @Test
-       public void testQuantile2SparseCP() 
-       {
+       public void testQuantile2SparseCP() {
                runQuantileTest(TEST_NAME1, 0.50, true, ExecType.CP);
        }
        
        @Test
-       public void testQuantile3SparseCP() 
-       {
+       public void testQuantile3SparseCP() {
                runQuantileTest(TEST_NAME1, 0.75, true, ExecType.CP);
        }
        
        @Test
-       public void testQuantile1DenseMR() 
-       {
+       public void testQuantile1DenseMR() {
                runQuantileTest(TEST_NAME1, 0.25, false, ExecType.MR);
        }
        
        @Test
-       public void testQuantile2DenseMR() 
-       {
+       public void testQuantile2DenseMR() {
                runQuantileTest(TEST_NAME1, 0.50, false, ExecType.MR);
        }
        
        @Test
-       public void testQuantile3DenseMR() 
-       {
+       public void testQuantile3DenseMR() {
                runQuantileTest(TEST_NAME1, 0.75, false, ExecType.MR);
        }
        
        @Test
-       public void testQuantile1SparseMR() 
-       {
+       public void testQuantile1SparseMR() {
                runQuantileTest(TEST_NAME1, 0.25, true, ExecType.MR);
        }
        
        @Test
-       public void testQuantile2SparseMR() 
-       {
+       public void testQuantile2SparseMR() {
                runQuantileTest(TEST_NAME1, 0.50, true, ExecType.MR);
        }
        
        @Test
-       public void testQuantile3SparseMR() 
-       {
+       public void testQuantile3SparseMR() {
                runQuantileTest(TEST_NAME1, 0.75, true, ExecType.MR);
        }
 
        @Test
-       public void testQuantile1DenseSP() 
-       {
+       public void testQuantile1DenseSP() {
                runQuantileTest(TEST_NAME1, 0.25, false, ExecType.SPARK);
        }
        
        @Test
-       public void testQuantile2DenseSP() 
-       {
+       public void testQuantile2DenseSP() {
                runQuantileTest(TEST_NAME1, 0.50, false, ExecType.SPARK);
        }
        
        @Test
-       public void testQuantile3DenseSP() 
-       {
+       public void testQuantile3DenseSP() {
                runQuantileTest(TEST_NAME1, 0.75, false, ExecType.SPARK);
        }
        
        @Test
-       public void testQuantile1SparseSP() 
-       {
+       public void testQuantile1SparseSP() {
                runQuantileTest(TEST_NAME1, 0.25, true, ExecType.SPARK);
        }
        
        @Test
-       public void testQuantile2SparseSP() 
-       {
+       public void testQuantile2SparseSP() {
                runQuantileTest(TEST_NAME1, 0.50, true, ExecType.SPARK);
        }
        
        @Test
-       public void testQuantile3SparseSP() 
-       {
+       public void testQuantile3SparseSP() {
                runQuantileTest(TEST_NAME1, 0.75, true, ExecType.SPARK);
        }
 
        @Test
-       public void testMedianDenseCP() 
-       {
+       public void testMedianDenseCP() {
                runQuantileTest(TEST_NAME2, -1, false, ExecType.CP);
        }
        
        @Test
-       public void testMedianSparseCP() 
-       {
+       public void testMedianSparseCP() {
                runQuantileTest(TEST_NAME2, -1, true, ExecType.CP);
        }
        
        @Test
-       public void testMedianDenseMR() 
-       {
+       public void testMedianDenseMR() {
                runQuantileTest(TEST_NAME2, -1, false, ExecType.MR);
        }
        
        @Test
-       public void testMedianSparseMR() 
-       {
+       public void testMedianSparseMR() {
                runQuantileTest(TEST_NAME2, -1, true, ExecType.MR);
        }
        
        @Test
-       public void testMedianDenseSP() 
-       {
+       public void testMedianDenseSP() {
                runQuantileTest(TEST_NAME2, -1, false, ExecType.SPARK);
        }
 
        @Test
-       public void testMedianSparseSP() 
-       {
+       public void testMedianSparseSP() {
                runQuantileTest(TEST_NAME2, -1, true, ExecType.SPARK);
        }
 
        @Test
-       public void testIQMDenseCP() 
-       {
+       public void testIQMDenseCP() {
                runQuantileTest(TEST_NAME3, -1, false, ExecType.CP);
        }
        
        @Test
-       public void testIQMSparseCP() 
-       {
+       public void testIQMSparseCP() {
                runQuantileTest(TEST_NAME3, -1, true, ExecType.CP);
        }
        
        @Test
-       public void testIQMDenseMR() 
-       {
+       public void testIQMDenseMR() {
                runQuantileTest(TEST_NAME3, -1, false, ExecType.MR);
        }
        
        @Test
-       public void testIQMSparseMR() 
-       {
+       public void testIQMSparseMR() {
                runQuantileTest(TEST_NAME3, -1, true, ExecType.MR);
        }
        
        @Test
-       public void testIQMDenseSP() 
-       {
+       public void testIQMDenseSP() {
                runQuantileTest(TEST_NAME3, -1, false, ExecType.SPARK);
        }
 
        @Test
-       public void testIQMSparseSP() 
-       {
+       public void testIQMSparseSP() {
                runQuantileTest(TEST_NAME3, -1, true, ExecType.SPARK);
        }
        
-       /**
-        * 
-        * @param sparseM1
-        * @param sparseM2
-        * @param instType
-        */
        private void runQuantileTest( String TEST_NAME, double p, boolean 
sparse, ExecType et)
        {
                //rtplatform for MR
@@ -268,11 +227,10 @@ public class QuantileWeightsTest extends AutomatedTestBase
                        fullDMLScriptName = HOME + TEST_NAME + ".dml";
                        programArgs = new String[]{"-args", 
                                input("A"), input("W"), Double.toString(p), 
output("R")};
-                       
                        fullRScriptName = HOME + TEST_NAME + ".R";
                        rCmd = "Rscript" + " " + fullRScriptName + " " + 
                                inputDir() + " " + p + " " + expectedDir();
-       
+                       
                        //generate actual dataset (always dense because values 
<=0 invalid)
                        double sparsitya = sparse ? sparsity2 : sparsity1;
                        double[][] A = getRandomMatrix(rows, 1, 1, maxVal, 
sparsitya, 1236); 
@@ -288,11 +246,9 @@ public class QuantileWeightsTest extends AutomatedTestBase
                        HashMap<CellIndex, Double> rfile  = 
readRMatrixFromFS("R");
                        TestUtils.compareMatrices(dmlfile, rfile, eps, 
"Stat-DML", "Stat-R");
                }
-               finally
-               {
-                       rtplatform = platformOld;
+               finally {
                        DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+                       rtplatform = platformOld;
                }
        }
-
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/1f3dfbc1/src/test/java/org/apache/sysml/test/integration/functions/unary/matrix/IQMTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/unary/matrix/IQMTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/unary/matrix/IQMTest.java
index 84bfcc7..5d931c2 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/unary/matrix/IQMTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/unary/matrix/IQMTest.java
@@ -22,20 +22,17 @@ package 
org.apache.sysml.test.integration.functions.unary.matrix;
 import static org.junit.Assert.assertTrue;
 
 import org.junit.Test;
-
+import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.test.integration.AutomatedTestBase;
 import org.apache.sysml.test.integration.TestConfiguration;
 import org.apache.sysml.test.utils.TestUtils;
 
 
-
 public class IQMTest extends AutomatedTestBase 
 {
-       
        private enum TEST_TYPE { 
                IQM ("IQM");
-                                       
                String scriptName = null;
                TEST_TYPE(String name) {
                        this.scriptName = name;
@@ -45,8 +42,7 @@ public class IQMTest extends AutomatedTestBase
        private final static String TEST_DIR = "functions/unary/matrix/";
        private static final String TEST_CLASS_DIR = TEST_DIR + 
IQMTest.class.getSimpleName() + "/";
 
-       private final static String[] datasets 
-       = {
+       private final static String[] datasets = {
                "2.2 3.2 3.7 4.4 5.3 5.7 6.1 6.4 7.2 7.8",  // IQM = 
5.3100000000000005
                "2 3 4 1 2 3 4",                                                
        // IQM = 2.7142857142857144
                "1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1",        // IQM = 1
@@ -60,16 +56,14 @@ public class IQMTest extends AutomatedTestBase
        private final static double[] dataLengths = { 10, 7, 19, 7, 9, 20, 10, 
21};
        private final static double[] expectedResults = {5.3100000000000005, 
2.7142857142857144, 1, 1, 9, 8, 4, 1.2857142857142858};
                
-       private final static String[] weightedDatasets 
-       = {
+       private final static String[] weightedDatasets = {
                "1 1 1 1 1 1 1",                                        // 
weighted IQM = 1
                "1 3 5 7 9 11 13 15 17",                        // weighted IQM 
= 8
                "-1 -3 -5   0 7  9 -11 13 15 17",       // weighted IQM = 
1.2857142857142858
                "-1 -3 -5 -11 0  7   9 13 15 17"        // weighted IQM = 0
        };
                
-       private final static String[] weights 
-       = {
+       private final static String[] weights = {
                "2 3 4 1  2 3 4",
                "2 3 2 4  2 1 2 3 1",
                "2 3 2 4  2 1 2 3 1 1",
@@ -165,6 +159,46 @@ public class IQMTest extends AutomatedTestBase
        }
        
        @Test
+       public void testIQM1_SP() {
+               runTest(RUNTIME_PLATFORM.SPARK, 1, false);
+       }
+       
+       @Test
+       public void testIQM2_SP() {
+               runTest(RUNTIME_PLATFORM.SPARK, 2, false);
+       }
+       
+       @Test
+       public void testIQM3_SP() {
+               runTest(RUNTIME_PLATFORM.SPARK, 3, false);
+       }
+       
+       @Test
+       public void testIQM4_SP() {
+               runTest(RUNTIME_PLATFORM.SPARK, 4, false);
+       }
+       
+       @Test
+       public void testIQM5_SP() {
+               runTest(RUNTIME_PLATFORM.SPARK, 5, false);
+       }
+       
+       @Test
+       public void testIQM6_SP() {
+               runTest(RUNTIME_PLATFORM.SPARK, 6, false);
+       }
+       
+       @Test
+       public void testIQM7_SP() {
+               runTest(RUNTIME_PLATFORM.SPARK, 7, false);
+       }
+       
+       @Test
+       public void testIQM8_SP() {
+               runTest(RUNTIME_PLATFORM.SPARK, 8, false);
+       }
+       
+       @Test
        public void testIQM1wt() {
                runTest(RUNTIME_PLATFORM.HYBRID, 1, true);
        }
@@ -206,25 +240,21 @@ public class IQMTest extends AutomatedTestBase
        
        @Test
        public void testIQM1wt_SP() {
-               if(rtplatform == RUNTIME_PLATFORM.SPARK)
                runTest(RUNTIME_PLATFORM.SPARK, 1, true);
        }
        
        @Test
        public void testIQM2wt_SP() {
-               if(rtplatform == RUNTIME_PLATFORM.SPARK)
                runTest(RUNTIME_PLATFORM.SPARK, 2, true);
        }
        
        @Test
        public void testIQM3wt_SP() {
-               if(rtplatform == RUNTIME_PLATFORM.SPARK)
                runTest(RUNTIME_PLATFORM.SPARK, 3, true);
        }
        
        @Test
        public void testIQM4wt_SP() {
-               if(rtplatform == RUNTIME_PLATFORM.SPARK)
                runTest(RUNTIME_PLATFORM.SPARK, 4, true);
        }
        
@@ -233,6 +263,10 @@ public class IQMTest extends AutomatedTestBase
                RUNTIME_PLATFORM rtOld = rtplatform;
                rtplatform = rt;
                
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               if( rtplatform == RUNTIME_PLATFORM.SPARK )
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+               
                try
                {
                        TEST_TYPE test = TEST_TYPE.IQM;
@@ -263,10 +297,8 @@ public class IQMTest extends AutomatedTestBase
                        
                        config.addVariable("rows", rows);
                        config.addVariable("cols", 1);
-       
                        loadTestConfiguration(config);
-       
-                       /* This is for running the junit test the new way, 
i.e., construct the arguments directly */
+                       
                        String HOME = SCRIPT_DIR + TEST_DIR;
                        fullDMLScriptName = HOME + test.scriptName + ".dml";
                        String outFile = output("iqmFile");
@@ -275,24 +307,21 @@ public class IQMTest extends AutomatedTestBase
                                outFile, wtOutFile };
                        
                        runTest(true, false, null, -1);
-       
+                       
                        double IQM = TestUtils.readDMLScalar(outFile);
                        double wtIQM = TestUtils.readDMLScalar(wtOutFile);
                        
                        if(isWeighted) {
-                               assertTrue("Incorrect weighted inter quartile 
mean", wtIQM == expectedIQM);
+                               assertTrue("Incorrect weighted inter quartile 
mean "+wtIQM+" vs "+expectedIQM, wtIQM == expectedIQM);
                        }
                        else {
-                               assertTrue("Incorrect inter quartile mean", 
wtIQM == IQM);
-                               assertTrue("Incorrect inter quartile mean", 
wtIQM == expectedIQM);
+                               assertTrue("Incorrect inter quartile mean 
"+wtIQM+" vs "+IQM, wtIQM == IQM);
+                               assertTrue("Incorrect inter quartile mean 
"+wtIQM+" vs "+expectedIQM, wtIQM == expectedIQM);
                        }
                }
-               finally
-               {
-                       //reset runtime platform
+               finally {
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
                        rtplatform = rtOld;
                }
        }
-       
-       
 }

Reply via email to