[SYSTEMML-1967] Fix spark rand instruction (#partitions for sparse)

This patch fixes the spark rand instruction to create the correct number
of partitions under awareness of sparsity. So far, this method called a
primitive for size estimation with the number of non-zeros instead of
the sparsity, which led to dense estimates. 

Furthermore, this patch also fixes minor configuration issues of enabled
hand-coded fused operators.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/4f29b348
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/4f29b348
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/4f29b348

Branch: refs/heads/master
Commit: 4f29b3485f4eb8a58aebd41eef22c5d0f92d632f
Parents: 5b8d626
Author: Matthias Boehm <[email protected]>
Authored: Tue Oct 17 23:09:40 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Tue Oct 17 23:09:40 2017 -0700

----------------------------------------------------------------------
 src/main/java/org/apache/sysml/hops/BinaryOp.java     | 14 ++++++++------
 .../java/org/apache/sysml/hops/OptimizerUtils.java    |  2 +-
 .../runtime/instructions/spark/RandSPInstruction.java |  2 +-
 3 files changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/4f29b348/src/main/java/org/apache/sysml/hops/BinaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/BinaryOp.java 
b/src/main/java/org/apache/sysml/hops/BinaryOp.java
index 58bbc8f..76c1a64 100644
--- a/src/main/java/org/apache/sysml/hops/BinaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/BinaryOp.java
@@ -1373,8 +1373,8 @@ public class BinaryOp extends Hop
 
        private static boolean requiresPartitioning( Hop rightInput )
        {
-               return (   rightInput.dimsKnown() //known input size 
-                && rightInput.getDim1()*rightInput.getDim2() > 
DistributedCacheInput.PARTITION_SIZE);
+               return ( rightInput.dimsKnown() //known input size 
+                       && rightInput.getDim1()*rightInput.getDim2() > 
DistributedCacheInput.PARTITION_SIZE);
        }
        
        public static boolean requiresReplication( Hop left, Hop right )
@@ -1393,9 +1393,10 @@ public class BinaryOp extends Hop
                long m1_cpb = left.getColsInBlock();
                
                //MR_BINARY_UAGG_CHAIN only applied if result is column/row 
vector of MV binary operation.
-               if( right instanceof AggUnaryOp && right.getInput().get(0) == 
left  //e.g., P / rowSums(P)
+               if( OptimizerUtils.ALLOW_OPERATOR_FUSION
+                       && right instanceof AggUnaryOp && 
right.getInput().get(0) == left  //e.g., P / rowSums(P)
                        && ((((AggUnaryOp) right).getDirection() == 
Direction.Row && m1_dim2 > 1 && m1_dim2 <= m1_cpb ) //single column block
-                   ||  (((AggUnaryOp) right).getDirection() == Direction.Col 
&& m1_dim1 > 1 && m1_dim1 <= m1_rpb ))) //single row block
+                       || (((AggUnaryOp) right).getDirection() == 
Direction.Col && m1_dim1 > 1 && m1_dim1 <= m1_rpb ))) //single row block
                {
                        return MMBinaryMethod.MR_BINARY_UAGG_CHAIN;
                }
@@ -1430,9 +1431,10 @@ public class BinaryOp extends Hop
                }
                
                //MR_BINARY_UAGG_CHAIN only applied if result is column/row 
vector of MV binary operation.
-               if( right instanceof AggUnaryOp && right.getInput().get(0) == 
left  //e.g., P / rowSums(P)
+               if( OptimizerUtils.ALLOW_OPERATOR_FUSION
+                       && right instanceof AggUnaryOp && 
right.getInput().get(0) == left  //e.g., P / rowSums(P)
                        && ((((AggUnaryOp) right).getDirection() == 
Direction.Row && m1_dim2 > 1 && m1_dim2 <= m1_cpb ) //single column block
-                   ||  (((AggUnaryOp) right).getDirection() == Direction.Col 
&& m1_dim1 > 1 && m1_dim1 <= m1_rpb ))) //single row block
+                       || (((AggUnaryOp) right).getDirection() == 
Direction.Col && m1_dim1 > 1 && m1_dim1 <= m1_rpb ))) //single row block
                {
                        return MMBinaryMethod.MR_BINARY_UAGG_CHAIN;
                }

http://git-wip-us.apache.org/repos/asf/systemml/blob/4f29b348/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index 5d831e5..d67e086 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -709,7 +709,7 @@ public class OptimizerUtils
                //check for guaranteed existence of empty blocks (less nnz than 
total number of blocks)
                long tnrblks = (long)Math.ceil((double)rlen/brlen);
                long tncblks = (long)Math.ceil((double)clen/bclen);
-               long nnz = (long) Math.ceil(sp * rlen * clen);          
+               long nnz = (long) Math.ceil(sp * rlen * clen);
                if( nnz < tnrblks * tncblks ) {
                        long lrlen = Math.min(rlen, brlen);
                        long lclen = Math.min(clen, bclen);

http://git-wip-us.apache.org/repos/asf/systemml/blob/4f29b348/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
index b50bf73..2266eeb 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
@@ -350,7 +350,7 @@ public class RandSPInstruction extends UnarySPInstruction {
                LongStream nnz = LibMatrixDatagen.computeNNZperBlock(rows, 
cols, rowsInBlock, colsInBlock, sparsity);
                PrimitiveIterator.OfLong nnzIter = nnz.iterator();
                double totalSize = 
OptimizerUtils.estimatePartitionedSizeExactSparsity( rows, cols, rowsInBlock, 
-                       colsInBlock, rows*cols*sparsity); //overestimate for on 
disk, ensures hdfs block per partition
+                       colsInBlock, sparsity); //overestimate for on disk, 
ensures hdfs block per partition
                double hdfsBlkSize = InfrastructureAnalyzer.getHDFSBlockSize();
                long numBlocks = new MatrixCharacteristics(rows, cols, 
rowsInBlock, colsInBlock).getNumBlocks();
                long numColBlocks = 
(long)Math.ceil((double)cols/(double)colsInBlock);

Reply via email to