Repository: systemml Updated Branches: refs/heads/master 159522a1f -> 8a51003ec
[SYSTEMML-2197] Multi-threaded spark broadcast creation Closes #757. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/8a51003e Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/8a51003e Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/8a51003e Branch: refs/heads/master Commit: 8a51003ec9656f38f25e0e49d2180c49b2ffc6f7 Parents: 159522a Author: EdgarLGB <[email protected]> Authored: Sun Apr 8 15:10:47 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Sun Apr 8 15:10:47 2018 -0700 ---------------------------------------------------------------------- .../context/SparkExecutionContext.java | 40 ++++++++++---------- .../spark/data/PartitionedBlock.java | 25 ++++++------ 2 files changed, 31 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/8a51003e/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index 325a359..959cd76 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -52,6 +52,7 @@ import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.compress.CompressedMatrixBlock; import org.apache.sysml.runtime.controlprogram.Program; +import org.apache.sysml.runtime.controlprogram.caching.CacheBlock; import org.apache.sysml.runtime.controlprogram.caching.CacheableData; import org.apache.sysml.runtime.controlprogram.caching.FrameObject; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; @@ -552,19 +553,12 @@ public class SparkExecutionContext extends ExecutionContext Broadcast<PartitionedBlock<MatrixBlock>>[] ret = new Broadcast[numParts]; //create coarse-grained partitioned broadcasts - if( numParts > 1 ) { - for( int i=0; i<numParts; i++ ) { - int offset = i * numPerPart; - int numBlks = Math.min(numPerPart, pmb.getNumRowBlocks()*pmb.getNumColumnBlocks()-offset); - PartitionedBlock<MatrixBlock> tmp = pmb.createPartition(offset, numBlks, new MatrixBlock()); - ret[i] = getSparkContext().broadcast(tmp); - if( !isLocalMaster() ) - tmp.clearBlocks(); - } - } + if (numParts > 1) { + Arrays.parallelSetAll(ret, i -> createPartitionedBroadcast(pmb, numPerPart, i)); + } else { //single partition ret[0] = getSparkContext().broadcast(pmb); - if( !isLocalMaster() ) + if (!isLocalMaster()) pmb.clearBlocks(); } @@ -621,19 +615,12 @@ public class SparkExecutionContext extends ExecutionContext Broadcast<PartitionedBlock<FrameBlock>>[] ret = new Broadcast[numParts]; //create coarse-grained partitioned broadcasts - if( numParts > 1 ) { - for( int i=0; i<numParts; i++ ) { - int offset = i * numPerPart; - int numBlks = Math.min(numPerPart, pmb.getNumRowBlocks()*pmb.getNumColumnBlocks()-offset); - PartitionedBlock<FrameBlock> tmp = pmb.createPartition(offset, numBlks, new FrameBlock()); - ret[i] = getSparkContext().broadcast(tmp); - if( !isLocalMaster() ) - tmp.clearBlocks(); - } + if (numParts > 1) { + Arrays.parallelSetAll(ret, i -> createPartitionedBroadcast(pmb, numPerPart, i)); } else { //single partition ret[0] = getSparkContext().broadcast(pmb); - if( !isLocalMaster() ) + if (!isLocalMaster()) pmb.clearBlocks(); } @@ -651,6 +638,17 @@ public class SparkExecutionContext extends ExecutionContext return bret; } + + private Broadcast<PartitionedBlock<? extends CacheBlock>> createPartitionedBroadcast( + PartitionedBlock<? extends CacheBlock> pmb, int numPerPart, int pos) { + int offset = pos * numPerPart; + int numBlks = Math.min(numPerPart, pmb.getNumRowBlocks() * pmb.getNumColumnBlocks() - offset); + PartitionedBlock<? extends CacheBlock> tmp = pmb.createPartition(offset, numBlks); + Broadcast<PartitionedBlock<? extends CacheBlock>> ret = getSparkContext().broadcast(tmp); + if (!isLocalMaster()) + tmp.clearBlocks(); + return ret; + } /** * Keep the output rdd of spark rdd operations as meta data of matrix/frame http://git-wip-us.apache.org/repos/asf/systemml/blob/8a51003e/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java index ba4660a..61eb5e9 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java @@ -29,6 +29,7 @@ import java.io.ObjectOutput; import java.io.ObjectOutputStream; import java.lang.reflect.Constructor; import java.util.ArrayList; +import java.util.Arrays; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.CacheBlock; @@ -76,21 +77,19 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable int ncblks = getNumColumnBlocks(); int code = CacheBlockFactory.getCode(block); - try - { + try { _partBlocks = new CacheBlock[nrblks * ncblks]; - for( int i=0, ix=0; i<nrblks; i++ ) - for( int j=0; j<ncblks; j++, ix++ ) { - T tmp = (T) CacheBlockFactory.newInstance(code); - block.slice(i*_brlen, Math.min((i+1)*_brlen, rlen)-1, - j*_bclen, Math.min((j+1)*_bclen, clen)-1, tmp); - _partBlocks[ix] = tmp; - } - } - catch(Exception ex) { + Arrays.parallelSetAll(_partBlocks, index -> { + int i = index % nrblks; + int j = index % ncblks; + T tmp = (T) CacheBlockFactory.newInstance(code); + return block.slice(i * _brlen, Math.min((i + 1) * _brlen, rlen) - 1, + j * _bclen, Math.min((j + 1) * _bclen, clen) - 1, tmp); + }); + } catch(Exception ex) { throw new RuntimeException("Failed partitioning of broadcast variable input.", ex); } - + _offset = 0; } @@ -107,7 +106,7 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable _partBlocks = new CacheBlock[nrblks * ncblks]; } - public PartitionedBlock<T> createPartition( int offset, int numBlks, T block ) + public PartitionedBlock<T> createPartition( int offset, int numBlks) { PartitionedBlock<T> ret = new PartitionedBlock<>(); ret._rlen = _rlen;
