Repository: systemml Updated Branches: refs/heads/master 5b8d62659 -> 4f29b3485
[SYSTEMML-1967] Fix spark rand instruction (#partitions for sparse) This patch fixes the spark rand instruction to create the correct number of partitions under awareness of sparsity. So far, this method called a primitive for size estimation with the number of non-zeros instead of the sparsity, which led to dense estimates. Furthermore, this patch also fixes minor configuration issues of enabled hand-coded fused operators. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/4f29b348 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/4f29b348 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/4f29b348 Branch: refs/heads/master Commit: 4f29b3485f4eb8a58aebd41eef22c5d0f92d632f Parents: 5b8d626 Author: Matthias Boehm <[email protected]> Authored: Tue Oct 17 23:09:40 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Tue Oct 17 23:09:40 2017 -0700 ---------------------------------------------------------------------- src/main/java/org/apache/sysml/hops/BinaryOp.java | 14 ++++++++------ .../java/org/apache/sysml/hops/OptimizerUtils.java | 2 +- .../runtime/instructions/spark/RandSPInstruction.java | 2 +- 3 files changed, 10 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/4f29b348/src/main/java/org/apache/sysml/hops/BinaryOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/BinaryOp.java b/src/main/java/org/apache/sysml/hops/BinaryOp.java index 58bbc8f..76c1a64 100644 --- a/src/main/java/org/apache/sysml/hops/BinaryOp.java +++ b/src/main/java/org/apache/sysml/hops/BinaryOp.java @@ -1373,8 +1373,8 @@ public class BinaryOp extends Hop private static boolean requiresPartitioning( Hop rightInput ) { - return ( rightInput.dimsKnown() //known input size - && rightInput.getDim1()*rightInput.getDim2() > DistributedCacheInput.PARTITION_SIZE); + return ( rightInput.dimsKnown() //known input size + && rightInput.getDim1()*rightInput.getDim2() > DistributedCacheInput.PARTITION_SIZE); } public static boolean requiresReplication( Hop left, Hop right ) @@ -1393,9 +1393,10 @@ public class BinaryOp extends Hop long m1_cpb = left.getColsInBlock(); //MR_BINARY_UAGG_CHAIN only applied if result is column/row vector of MV binary operation. - if( right instanceof AggUnaryOp && right.getInput().get(0) == left //e.g., P / rowSums(P) + if( OptimizerUtils.ALLOW_OPERATOR_FUSION + && right instanceof AggUnaryOp && right.getInput().get(0) == left //e.g., P / rowSums(P) && ((((AggUnaryOp) right).getDirection() == Direction.Row && m1_dim2 > 1 && m1_dim2 <= m1_cpb ) //single column block - || (((AggUnaryOp) right).getDirection() == Direction.Col && m1_dim1 > 1 && m1_dim1 <= m1_rpb ))) //single row block + || (((AggUnaryOp) right).getDirection() == Direction.Col && m1_dim1 > 1 && m1_dim1 <= m1_rpb ))) //single row block { return MMBinaryMethod.MR_BINARY_UAGG_CHAIN; } @@ -1430,9 +1431,10 @@ public class BinaryOp extends Hop } //MR_BINARY_UAGG_CHAIN only applied if result is column/row vector of MV binary operation. - if( right instanceof AggUnaryOp && right.getInput().get(0) == left //e.g., P / rowSums(P) + if( OptimizerUtils.ALLOW_OPERATOR_FUSION + && right instanceof AggUnaryOp && right.getInput().get(0) == left //e.g., P / rowSums(P) && ((((AggUnaryOp) right).getDirection() == Direction.Row && m1_dim2 > 1 && m1_dim2 <= m1_cpb ) //single column block - || (((AggUnaryOp) right).getDirection() == Direction.Col && m1_dim1 > 1 && m1_dim1 <= m1_rpb ))) //single row block + || (((AggUnaryOp) right).getDirection() == Direction.Col && m1_dim1 > 1 && m1_dim1 <= m1_rpb ))) //single row block { return MMBinaryMethod.MR_BINARY_UAGG_CHAIN; } http://git-wip-us.apache.org/repos/asf/systemml/blob/4f29b348/src/main/java/org/apache/sysml/hops/OptimizerUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java index 5d831e5..d67e086 100644 --- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java +++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java @@ -709,7 +709,7 @@ public class OptimizerUtils //check for guaranteed existence of empty blocks (less nnz than total number of blocks) long tnrblks = (long)Math.ceil((double)rlen/brlen); long tncblks = (long)Math.ceil((double)clen/bclen); - long nnz = (long) Math.ceil(sp * rlen * clen); + long nnz = (long) Math.ceil(sp * rlen * clen); if( nnz < tnrblks * tncblks ) { long lrlen = Math.min(rlen, brlen); long lclen = Math.min(clen, bclen); http://git-wip-us.apache.org/repos/asf/systemml/blob/4f29b348/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java index b50bf73..2266eeb 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java @@ -350,7 +350,7 @@ public class RandSPInstruction extends UnarySPInstruction { LongStream nnz = LibMatrixDatagen.computeNNZperBlock(rows, cols, rowsInBlock, colsInBlock, sparsity); PrimitiveIterator.OfLong nnzIter = nnz.iterator(); double totalSize = OptimizerUtils.estimatePartitionedSizeExactSparsity( rows, cols, rowsInBlock, - colsInBlock, rows*cols*sparsity); //overestimate for on disk, ensures hdfs block per partition + colsInBlock, sparsity); //overestimate for on disk, ensures hdfs block per partition double hdfsBlkSize = InfrastructureAnalyzer.getHDFSBlockSize(); long numBlocks = new MatrixCharacteristics(rows, cols, rowsInBlock, colsInBlock).getNumBlocks(); long numColBlocks = (long)Math.ceil((double)cols/(double)colsInBlock);
