Repository: systemml
Updated Branches:
  refs/heads/master afbe7bf2f -> 1fa8e126f


[SYSTEMML-2163] Performance large partitioned spark broadcasts >2GB

This patch makes two minor performance improvements for large, i.e.,
partitioned spark broadcasts. Due to the 2GB limitation for spark
broadcasts, we use partitioned broadcasts, where large side inputs are
chunked up into multiple broadcasts along with an abstraction that hides
this underlying storage from the spark instructions. 

With this patch, (1) we use a more aggressive chunking closer to the 2GB
limit which reduces the number of broadcasts but still ensures that we
never exceed the limit, and (2) we redundantly maintain the meta data in
the wrapper object, which avoids unnecessary broadcast fetches of b[0]
if not all chunks are required by all executors.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/1fa8e126
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/1fa8e126
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/1fa8e126

Branch: refs/heads/master
Commit: 1fa8e126f92072b8207c77d96c2afb4793783cdc
Parents: afbe7bf
Author: Matthias Boehm <[email protected]>
Authored: Wed Feb 28 21:30:16 2018 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Wed Feb 28 21:30:16 2018 -0800

----------------------------------------------------------------------
 .../context/SparkExecutionContext.java          |  4 +-
 .../spark/data/PartitionedBroadcast.java        | 39 ++++++++++----------
 2 files changed, 21 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/1fa8e126/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index a76b39b..91c577b 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -568,7 +568,7 @@ public class SparkExecutionContext extends ExecutionContext
                                        pmb.clearBlocks();
                        }
                        
-                       bret = new PartitionedBroadcast<>(ret);
+                       bret = new PartitionedBroadcast<>(ret, 
mo.getMatrixCharacteristics());
                        BroadcastObject<MatrixBlock> bchandle = new 
BroadcastObject<>(bret,
                                        
OptimizerUtils.estimatePartitionedSizeExactSparsity(mo.getMatrixCharacteristics()));
                        mo.setBroadcastHandle(bchandle);
@@ -638,7 +638,7 @@ public class SparkExecutionContext extends ExecutionContext
                                        pmb.clearBlocks();
                        }
 
-                       bret = new PartitionedBroadcast<>(ret);
+                       bret = new PartitionedBroadcast<>(ret, 
fo.getMatrixCharacteristics());
                        BroadcastObject<FrameBlock> bchandle = new 
BroadcastObject<>(bret,
                                        
OptimizerUtils.estimatePartitionedSizeExactSparsity(fo.getMatrixCharacteristics()));
                        fo.setBroadcastHandle(bchandle);

http://git-wip-us.apache.org/repos/asf/systemml/blob/1fa8e126/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
index 4c4766b..ef5395b 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java
@@ -25,30 +25,33 @@ import org.apache.spark.broadcast.Broadcast;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 
 /**
  * This class is a wrapper around an array of broadcasts of partitioned 
matrix/frame blocks,
  * which is required due to 2GB limitations of Spark's broadcast handling. 
Without this
  * partitioning of {@code Broadcast<PartitionedBlock>} into {@code 
Broadcast<PartitionedBlock>[]},
  * we got java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE 
issue.
- * Despite various jiras, this issue still showed up in Spark 1.4/1.5. 
+ * Despite various jiras, this issue still showed up in Spark 2.1. 
  * 
  */
 public class PartitionedBroadcast<T extends CacheBlock> implements Serializable
 {
        private static final long serialVersionUID = 7041959166079438401L;
 
-       protected static final long BROADCAST_PARTSIZE = 200L*1024*1024; //200M 
cells ~ 1.6GB 
+       //note: that each block (max 240 * 1024) also requires some header space
+       protected static final long BROADCAST_PARTSIZE = 240L*1024*1024; //250M 
cells > 1.875GB 
        
        private Broadcast<PartitionedBlock<T>>[] _pbc = null;
+       private MatrixCharacteristics _mc;
        
        public PartitionedBroadcast() {
                //do nothing (required for Externalizable)
        }
        
-       public PartitionedBroadcast(Broadcast<PartitionedBlock<T>>[] broadcasts)
-       {
+       public PartitionedBroadcast(Broadcast<PartitionedBlock<T>>[] 
broadcasts, MatrixCharacteristics mc) {
                _pbc = broadcasts;
+               _mc = mc;
        }
        
        public Broadcast<PartitionedBlock<T>>[] getBroadcasts() {
@@ -56,48 +59,44 @@ public class PartitionedBroadcast<T extends CacheBlock> 
implements Serializable
        }
        
        public long getNumRows() {
-               return _pbc[0].value().getNumRows();
+               return _mc.getRows();
        }
        
        public long getNumCols() {
-               return _pbc[0].value().getNumCols();
+               return _mc.getCols();
        }
 
        public int getNumRowBlocks() {
-               return _pbc[0].value().getNumRowBlocks();
+               return (int)_mc.getNumRowBlocks();
        }
        
        public int getNumColumnBlocks() {
-               return _pbc[0].value().getNumColumnBlocks();
+               return (int)_mc.getNumColBlocks();
        }
 
        public static int computeBlocksPerPartition(long rlen, long clen, long 
brlen, long bclen) {
-               return (int) Math.floor( BROADCAST_PARTSIZE /  
-                               Math.min(rlen, brlen) / Math.min(clen, bclen));
+               return (int) Math.floor( BROADCAST_PARTSIZE /
+                       Math.min(rlen, brlen) / Math.min(clen, bclen));
        }
 
        public T getBlock(int rowIndex, int colIndex) 
                throws DMLRuntimeException 
        {
                int pix = 0;
-               
-               if( _pbc.length > 1 ) { 
-                       //compute partition index
-                       PartitionedBlock<T> tmp = _pbc[0].value();
-                       int numPerPart = 
computeBlocksPerPartition(tmp.getNumRows(), tmp.getNumCols(), 
-                                       tmp.getNumRowsPerBlock(), 
tmp.getNumColumnsPerBlock());
-                       int ix = 
(rowIndex-1)*tmp.getNumColumnBlocks()+(colIndex-1);
+               if( _pbc.length > 1 ) { //compute partition index
+                       int numPerPart = 
computeBlocksPerPartition(_mc.getRows(),
+                               _mc.getCols(),_mc.getRowsPerBlock(), 
_mc.getColsPerBlock());
+                       int ix = (rowIndex-1)*getNumColumnBlocks()+(colIndex-1);
                        pix = ix / numPerPart;
                }
-                       
+               
                return _pbc[pix].value().getBlock(rowIndex, colIndex);
        }
        
        public T slice(long rl, long ru, long cl, long cu, T block) 
-                       throws DMLRuntimeException 
+               throws DMLRuntimeException 
        {
                T ret = null;
-               
                for( Broadcast<PartitionedBlock<T>> bc : _pbc ) {
                        PartitionedBlock<T> pm = bc.value();
                        T tmp = pm.slice(rl, ru, cl, cu, block);

Reply via email to