[SYSTEMML-2173] Propagation of upper bound nnz over spark instructions

Due to lazy evaluation many spark instructions only update the output
number of non-zeros if it can be exactly inferred from the inputs (e.g.,
for reorg operations). This is especially problematic for ultra-sparse
data sets where a treatment as dense intermediate can lead to severe
performance issues due to conservative plans and fallbacks such as
guarded collect. This patch improves the nnz handling by propagating
worst-case upper bounds at runtime level (similar to the propagation at
compiler level). Furthermore, this also includes the exploitation of
these bounds in the guarded collect path to avoid unnecessary
dense/sparse flips and HDFS I/O.



Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/340c99d2
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/340c99d2
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/340c99d2

Branch: refs/heads/master
Commit: 340c99d26d37b48b67d36402f45715b7e4bf8d85
Parents: 5b9cb15
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Wed Mar 7 13:13:16 2018 -0800
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Wed Mar 7 14:58:36 2018 -0800

----------------------------------------------------------------------
 .../org/apache/sysml/hops/OptimizerUtils.java   |  2 +-
 .../controlprogram/caching/MatrixObject.java    |  6 ++---
 .../spark/MatrixIndexingSPInstruction.java      |  1 +
 .../spark/MatrixReshapeSPInstruction.java       |  4 +++-
 .../runtime/matrix/MatrixCharacteristics.java   | 24 +++++++++++++++++---
 5 files changed, 29 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/340c99d2/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index b8150f8..f21fe46 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -472,7 +472,7 @@ public class OptimizerUtils
                                mc.getCols(),
                                mc.getRowsPerBlock(),
                                mc.getColsPerBlock(),
-                               mc.getNonZeros(), memPinned);
+                               mc.getNonZerosBound(), memPinned);
        }
        
        public static boolean checkSparkCollectMemoryBudget( long rlen, long 
clen, int brlen, int bclen, long nnz, long memPinned )

http://git-wip-us.apache.org/repos/asf/systemml/blob/340c99d2/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
index e744ecb..d0d7347 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
@@ -472,7 +472,7 @@ public class MatrixObject extends CacheableData<MatrixBlock>
                        int clen = (int)mc.getCols();
                        int brlen = (int)mc.getRowsPerBlock();
                        int bclen = (int)mc.getColsPerBlock();
-                       long nnz = mc.getNonZeros();
+                       long nnz = mc.getNonZerosBound();
                        
                        //guarded rdd collect 
                        if( ii == InputInfo.BinaryBlockInputInfo && //guarded 
collect not for binary cell
@@ -492,11 +492,11 @@ public class MatrixObject extends 
CacheableData<MatrixBlock>
                        }
                        else if( ii == InputInfo.BinaryCellInputInfo ) {
                                //collect matrix block from binary block RDD
-                               mb = SparkExecutionContext.toMatrixBlock(lrdd, 
rlen, clen, nnz);                
+                               mb = SparkExecutionContext.toMatrixBlock(lrdd, 
rlen, clen, nnz);
                        }
                        else {
                                //collect matrix block from binary cell RDD
-                               mb = SparkExecutionContext.toMatrixBlock(lrdd, 
rlen, clen, brlen, bclen, nnz);  
+                               mb = SparkExecutionContext.toMatrixBlock(lrdd, 
rlen, clen, brlen, bclen, nnz);
                        }
                }
                catch(DMLRuntimeException ex) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/340c99d2/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
index d1e02ca..b6df075 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixIndexingSPInstruction.java
@@ -99,6 +99,7 @@ public class MatrixIndexingSPInstruction extends 
IndexingSPInstruction {
                        MatrixCharacteristics mcIn = 
sec.getMatrixCharacteristics(input1.getName());
                        MatrixCharacteristics mcOut = 
sec.getMatrixCharacteristics(output.getName());
                        mcOut.set(ru-rl+1, cu-cl+1, mcIn.getRowsPerBlock(), 
mcIn.getColsPerBlock());
+                       mcOut.setNonZerosBound(Math.min(mcOut.getLength(), 
mcIn.getNonZerosBound()));
                        checkValidOutputDimensions(mcOut);
                        
                        //execute right indexing operation 
(partitioning-preserving if possible)

http://git-wip-us.apache.org/repos/asf/systemml/blob/340c99d2/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
index 65b7bb7..5a80aac 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
@@ -92,7 +92,9 @@ public class MatrixReshapeSPInstruction extends 
UnarySPInstruction {
                MatrixCharacteristics mcOut = sec.getMatrixCharacteristics( 
output.getName() );
                
                //update output characteristics and sanity check
-               mcOut.set(rows, cols, mcIn.getRowsPerBlock(), 
mcIn.getColsPerBlock());
+               mcOut.set(rows, cols, mcIn.getRowsPerBlock(), 
mcIn.getColsPerBlock(), mcIn.getNonZeros());
+               if( !mcIn.nnzKnown() )
+                       mcOut.setNonZerosBound(mcIn.getNonZerosBound());
                if( mcIn.getRows()*mcIn.getCols() != 
mcOut.getRows()*mcOut.getCols() ) {
                        throw new DMLRuntimeException("Incompatible matrix 
characteristics for reshape: "
                                + mcIn.getRows()+"x"+mcIn.getCols()+" vs 
"+mcOut.getRows()+"x"+mcOut.getCols());

http://git-wip-us.apache.org/repos/asf/systemml/blob/340c99d2/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java 
b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
index 6f73de1..7766162 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/MatrixCharacteristics.java
@@ -76,6 +76,7 @@ public class MatrixCharacteristics implements Serializable
        private int numRowsPerBlock = 1;
        private int numColumnsPerBlock = 1;
        private long nonZero = -1;
+       private boolean ubNnz = false;
        
        public MatrixCharacteristics() {
        
@@ -106,6 +107,7 @@ public class MatrixCharacteristics implements Serializable
                numRowsPerBlock = bnr;
                numColumnsPerBlock = bnc;
                nonZero = nnz;
+               ubNnz = false;
        }
        
        public void set(MatrixCharacteristics that) {
@@ -114,6 +116,7 @@ public class MatrixCharacteristics implements Serializable
                numRowsPerBlock = that.numRowsPerBlock;
                numColumnsPerBlock = that.numColumnsPerBlock;
                nonZero = that.nonZero;
+               ubNnz = that.ubNnz;
        }
        
        public long getRows(){
@@ -124,6 +127,10 @@ public class MatrixCharacteristics implements Serializable
                return numColumns;
        }
        
+       public long getLength() {
+               return numRows * numColumns;
+       }
+       
        public int getRowsPerBlock() {
                return numRowsPerBlock;
        }
@@ -156,7 +163,7 @@ public class MatrixCharacteristics implements Serializable
        
        @Override
        public String toString() {
-               return "["+numRows+" x "+numColumns+", nnz="+nonZero
+               return "["+numRows+" x "+numColumns+", nnz="+nonZero+" 
("+ubNnz+")"
                +", blocks ("+numRowsPerBlock+" x "+numColumnsPerBlock+")]";
        }
        
@@ -175,10 +182,20 @@ public class MatrixCharacteristics implements Serializable
        }
        
        public void setNonZeros(long nnz) {
+               ubNnz = false;
                nonZero = nnz;
        }
        
        public long getNonZeros() {
+               return !ubNnz ? nonZero : -1;
+       }
+       
+       public void setNonZerosBound(long nnz) {
+               ubNnz = true;
+               nonZero = nnz;
+       }
+       
+       public long getNonZerosBound() {
                return nonZero;
        }
        
@@ -187,7 +204,8 @@ public class MatrixCharacteristics implements Serializable
        }
        
        public boolean dimsKnown(boolean includeNnz) {
-               return ( numRows >= 0 && numColumns >= 0 && (!includeNnz || 
nonZero >= 0));
+               return ( numRows >= 0 && numColumns >= 0
+                       && (!includeNnz || nnzKnown()));
        }
        
        public boolean rowsKnown() {
@@ -199,7 +217,7 @@ public class MatrixCharacteristics implements Serializable
        }
        
        public boolean nnzKnown() {
-               return ( nonZero >= 0 );
+               return ( !ubNnz && nonZero >= 0 );
        }
        
        public boolean mightHaveEmptyBlocks() {

Reply via email to