Repository: systemml Updated Branches: refs/heads/master afbe7bf2f -> 1fa8e126f
[SYSTEMML-2163] Performance large partitioned spark broadcasts >2GB This patch makes two minor performance improvements for large, i.e., partitioned spark broadcasts. Due to the 2GB limitation for spark broadcasts, we use partitioned broadcasts, where large side inputs are chunked up into multiple broadcasts along with an abstraction that hides this underlying storage from the spark instructions. With this patch, (1) we use a more aggressive chunking closer to the 2GB limit which reduces the number of broadcasts but still ensures that we never exceed the limit, and (2) we redundantly maintain the meta data in the wrapper object, which avoids unnecessary broadcast fetches of b[0] if not all chunks are required by all executors. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/1fa8e126 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/1fa8e126 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/1fa8e126 Branch: refs/heads/master Commit: 1fa8e126f92072b8207c77d96c2afb4793783cdc Parents: afbe7bf Author: Matthias Boehm <[email protected]> Authored: Wed Feb 28 21:30:16 2018 -0800 Committer: Matthias Boehm <[email protected]> Committed: Wed Feb 28 21:30:16 2018 -0800 ---------------------------------------------------------------------- .../context/SparkExecutionContext.java | 4 +- .../spark/data/PartitionedBroadcast.java | 39 ++++++++++---------- 2 files changed, 21 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/1fa8e126/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index a76b39b..91c577b 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -568,7 +568,7 @@ public class SparkExecutionContext extends ExecutionContext pmb.clearBlocks(); } - bret = new PartitionedBroadcast<>(ret); + bret = new PartitionedBroadcast<>(ret, mo.getMatrixCharacteristics()); BroadcastObject<MatrixBlock> bchandle = new BroadcastObject<>(bret, OptimizerUtils.estimatePartitionedSizeExactSparsity(mo.getMatrixCharacteristics())); mo.setBroadcastHandle(bchandle); @@ -638,7 +638,7 @@ public class SparkExecutionContext extends ExecutionContext pmb.clearBlocks(); } - bret = new PartitionedBroadcast<>(ret); + bret = new PartitionedBroadcast<>(ret, fo.getMatrixCharacteristics()); BroadcastObject<FrameBlock> bchandle = new BroadcastObject<>(bret, OptimizerUtils.estimatePartitionedSizeExactSparsity(fo.getMatrixCharacteristics())); fo.setBroadcastHandle(bchandle); http://git-wip-us.apache.org/repos/asf/systemml/blob/1fa8e126/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java index 4c4766b..ef5395b 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBroadcast.java @@ -25,30 +25,33 @@ import org.apache.spark.broadcast.Broadcast; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.CacheBlock; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.matrix.MatrixCharacteristics; /** * This class is a wrapper around an array of broadcasts of partitioned matrix/frame blocks, * which is required due to 2GB limitations of Spark's broadcast handling. Without this * partitioning of {@code Broadcast<PartitionedBlock>} into {@code Broadcast<PartitionedBlock>[]}, * we got java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE issue. - * Despite various jiras, this issue still showed up in Spark 1.4/1.5. + * Despite various jiras, this issue still showed up in Spark 2.1. * */ public class PartitionedBroadcast<T extends CacheBlock> implements Serializable { private static final long serialVersionUID = 7041959166079438401L; - protected static final long BROADCAST_PARTSIZE = 200L*1024*1024; //200M cells ~ 1.6GB + //note: that each block (max 240 * 1024) also requires some header space + protected static final long BROADCAST_PARTSIZE = 240L*1024*1024; //250M cells > 1.875GB private Broadcast<PartitionedBlock<T>>[] _pbc = null; + private MatrixCharacteristics _mc; public PartitionedBroadcast() { //do nothing (required for Externalizable) } - public PartitionedBroadcast(Broadcast<PartitionedBlock<T>>[] broadcasts) - { + public PartitionedBroadcast(Broadcast<PartitionedBlock<T>>[] broadcasts, MatrixCharacteristics mc) { _pbc = broadcasts; + _mc = mc; } public Broadcast<PartitionedBlock<T>>[] getBroadcasts() { @@ -56,48 +59,44 @@ public class PartitionedBroadcast<T extends CacheBlock> implements Serializable } public long getNumRows() { - return _pbc[0].value().getNumRows(); + return _mc.getRows(); } public long getNumCols() { - return _pbc[0].value().getNumCols(); + return _mc.getCols(); } public int getNumRowBlocks() { - return _pbc[0].value().getNumRowBlocks(); + return (int)_mc.getNumRowBlocks(); } public int getNumColumnBlocks() { - return _pbc[0].value().getNumColumnBlocks(); + return (int)_mc.getNumColBlocks(); } public static int computeBlocksPerPartition(long rlen, long clen, long brlen, long bclen) { - return (int) Math.floor( BROADCAST_PARTSIZE / - Math.min(rlen, brlen) / Math.min(clen, bclen)); + return (int) Math.floor( BROADCAST_PARTSIZE / + Math.min(rlen, brlen) / Math.min(clen, bclen)); } public T getBlock(int rowIndex, int colIndex) throws DMLRuntimeException { int pix = 0; - - if( _pbc.length > 1 ) { - //compute partition index - PartitionedBlock<T> tmp = _pbc[0].value(); - int numPerPart = computeBlocksPerPartition(tmp.getNumRows(), tmp.getNumCols(), - tmp.getNumRowsPerBlock(), tmp.getNumColumnsPerBlock()); - int ix = (rowIndex-1)*tmp.getNumColumnBlocks()+(colIndex-1); + if( _pbc.length > 1 ) { //compute partition index + int numPerPart = computeBlocksPerPartition(_mc.getRows(), + _mc.getCols(),_mc.getRowsPerBlock(), _mc.getColsPerBlock()); + int ix = (rowIndex-1)*getNumColumnBlocks()+(colIndex-1); pix = ix / numPerPart; } - + return _pbc[pix].value().getBlock(rowIndex, colIndex); } public T slice(long rl, long ru, long cl, long cu, T block) - throws DMLRuntimeException + throws DMLRuntimeException { T ret = null; - for( Broadcast<PartitionedBlock<T>> bc : _pbc ) { PartitionedBlock<T> pm = bc.value(); T tmp = pm.slice(rl, ru, cl, cu, block);
