Repository: systemml
Updated Branches:
  refs/heads/master 159522a1f -> 8a51003ec


[SYSTEMML-2197] Multi-threaded spark broadcast creation

Closes #757.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/8a51003e
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/8a51003e
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/8a51003e

Branch: refs/heads/master
Commit: 8a51003ec9656f38f25e0e49d2180c49b2ffc6f7
Parents: 159522a
Author: EdgarLGB <[email protected]>
Authored: Sun Apr 8 15:10:47 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Sun Apr 8 15:10:47 2018 -0700

----------------------------------------------------------------------
 .../context/SparkExecutionContext.java          | 40 ++++++++++----------
 .../spark/data/PartitionedBlock.java            | 25 ++++++------
 2 files changed, 31 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/8a51003e/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 325a359..959cd76 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -52,6 +52,7 @@ import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysml.runtime.controlprogram.Program;
+import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
@@ -552,19 +553,12 @@ public class SparkExecutionContext extends 
ExecutionContext
                        Broadcast<PartitionedBlock<MatrixBlock>>[] ret = new 
Broadcast[numParts];
 
                        //create coarse-grained partitioned broadcasts
-                       if( numParts > 1 ) {
-                               for( int i=0; i<numParts; i++ ) {
-                                       int offset = i * numPerPart;
-                                       int numBlks = Math.min(numPerPart, 
pmb.getNumRowBlocks()*pmb.getNumColumnBlocks()-offset);
-                                       PartitionedBlock<MatrixBlock> tmp = 
pmb.createPartition(offset, numBlks, new MatrixBlock());
-                                       ret[i] = 
getSparkContext().broadcast(tmp);
-                                       if( !isLocalMaster() )
-                                               tmp.clearBlocks();
-                               }
-                       }
+                       if (numParts > 1) {
+                               Arrays.parallelSetAll(ret, i -> 
createPartitionedBroadcast(pmb, numPerPart, i));
+                       } 
                        else { //single partition
                                ret[0] = getSparkContext().broadcast(pmb);
-                               if( !isLocalMaster() )
+                               if (!isLocalMaster())
                                        pmb.clearBlocks();
                        }
                        
@@ -621,19 +615,12 @@ public class SparkExecutionContext extends 
ExecutionContext
                        Broadcast<PartitionedBlock<FrameBlock>>[] ret = new 
Broadcast[numParts];
 
                        //create coarse-grained partitioned broadcasts
-                       if( numParts > 1 ) {
-                               for( int i=0; i<numParts; i++ ) {
-                                       int offset = i * numPerPart;
-                                       int numBlks = Math.min(numPerPart, 
pmb.getNumRowBlocks()*pmb.getNumColumnBlocks()-offset);
-                                       PartitionedBlock<FrameBlock> tmp = 
pmb.createPartition(offset, numBlks, new FrameBlock());
-                                       ret[i] = 
getSparkContext().broadcast(tmp);
-                                       if( !isLocalMaster() )
-                                               tmp.clearBlocks();
-                               }
+                       if (numParts > 1) {
+                               Arrays.parallelSetAll(ret, i -> 
createPartitionedBroadcast(pmb, numPerPart, i));
                        }
                        else { //single partition
                                ret[0] = getSparkContext().broadcast(pmb);
-                               if( !isLocalMaster() )
+                               if (!isLocalMaster())
                                        pmb.clearBlocks();
                        }
 
@@ -651,6 +638,17 @@ public class SparkExecutionContext extends ExecutionContext
 
                return bret;
        }
+       
+       private Broadcast<PartitionedBlock<? extends CacheBlock>> 
createPartitionedBroadcast(
+                       PartitionedBlock<? extends CacheBlock> pmb, int 
numPerPart, int pos) {
+               int offset = pos * numPerPart;
+               int numBlks = Math.min(numPerPart, pmb.getNumRowBlocks() * 
pmb.getNumColumnBlocks() - offset);
+               PartitionedBlock<? extends CacheBlock> tmp = 
pmb.createPartition(offset, numBlks);
+               Broadcast<PartitionedBlock<? extends CacheBlock>> ret = 
getSparkContext().broadcast(tmp);
+               if (!isLocalMaster())
+                       tmp.clearBlocks();
+               return ret;
+       }
 
        /**
         * Keep the output rdd of spark rdd operations as meta data of 
matrix/frame

http://git-wip-us.apache.org/repos/asf/systemml/blob/8a51003e/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
index ba4660a..61eb5e9 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
@@ -29,6 +29,7 @@ import java.io.ObjectOutput;
 import java.io.ObjectOutputStream;
 import java.lang.reflect.Constructor;
 import java.util.ArrayList;
+import java.util.Arrays;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
@@ -76,21 +77,19 @@ public class PartitionedBlock<T extends CacheBlock> 
implements Externalizable
                int ncblks = getNumColumnBlocks();
                int code = CacheBlockFactory.getCode(block);
                
-               try
-               {
+               try {
                        _partBlocks = new CacheBlock[nrblks * ncblks];
-                       for( int i=0, ix=0; i<nrblks; i++ )
-                               for( int j=0; j<ncblks; j++, ix++ ) {
-                                       T tmp = (T) 
CacheBlockFactory.newInstance(code);
-                                       block.slice(i*_brlen, 
Math.min((i+1)*_brlen, rlen)-1, 
-                                                                  j*_bclen, 
Math.min((j+1)*_bclen, clen)-1, tmp);
-                                       _partBlocks[ix] = tmp;
-                               }
-               }
-               catch(Exception ex) {
+                       Arrays.parallelSetAll(_partBlocks, index -> {
+                               int i = index % nrblks;
+                               int j = index % ncblks;
+                               T tmp = (T) CacheBlockFactory.newInstance(code);
+                               return block.slice(i * _brlen, Math.min((i + 1) 
* _brlen, rlen) - 1,
+                                       j * _bclen, Math.min((j + 1) * _bclen, 
clen) - 1, tmp);
+                       });
+               } catch(Exception ex) {
                        throw new RuntimeException("Failed partitioning of 
broadcast variable input.", ex);
                }
-               
+
                _offset = 0;
        }
 
@@ -107,7 +106,7 @@ public class PartitionedBlock<T extends CacheBlock> 
implements Externalizable
                _partBlocks = new CacheBlock[nrblks * ncblks];
        }
 
-       public PartitionedBlock<T> createPartition( int offset, int numBlks, T 
block )
+       public PartitionedBlock<T> createPartition( int offset, int numBlks)
        {
                PartitionedBlock<T> ret = new PartitionedBlock<>();
                ret._rlen = _rlen;

Reply via email to