Repository: systemml
Updated Branches:
  refs/heads/master 43138fad5 -> 5b2815206


[SYSTEMML-2200] Fix unnecessary shuffle in distributed codegen ops

This patch makes a general improvement to distributed codegen operations
on spark. So far, the first input was always assumed to be an RDD and
the inputs to a fused operator where ordered accordingly. However, in
the presence of unknowns, there were scenarios with unnecessary RDD
inputs and thus, unnecessary shuffle due to distributed joins (with
large performance impact). We now decide on broadcasts in a more
generalized way. Furthermore, this also fixes the accounting of
currently pinned broadcasts, which was way too conservative in a parfor
context.

For example, on perftest Kmeans 10Mx1K with 10 concurrent runs, this
patch improved the end-to-end runtime from 9305s to 491s.
 

Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/5b281520
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/5b281520
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/5b281520

Branch: refs/heads/master
Commit: 5b2815206488d3af1f331e0f281d71d324b2b817
Parents: 43138fa
Author: Matthias Boehm <[email protected]>
Authored: Mon Mar 19 14:51:32 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Mon Mar 19 15:54:08 2018 -0700

----------------------------------------------------------------------
 .../controlprogram/caching/CacheableData.java   |  6 +++-
 .../parfor/stat/InfrastructureAnalyzer.java     | 10 ++++++-
 .../instructions/spark/SpoofSPInstruction.java  | 30 ++++++++++++++------
 3 files changed, 35 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/5b281520/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
index e2b9838..cdc0f03 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
@@ -38,6 +38,7 @@ import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.LazyWriteBuffer.RPolicy;
+import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysml.runtime.instructions.cp.CPInstruction;
 import org.apache.sysml.runtime.instructions.cp.Data;
@@ -1275,7 +1276,10 @@ public abstract class CacheableData<T extends 
CacheBlock> extends Data
        }
        
        public static long getBroadcastSize() {
-               return _refBCs.longValue();
+               //scale the total sum of all broadcasts by the current fraction
+               //of local memory to equally distribute it across parfor workers
+               return (long) (_refBCs.longValue() *
+                       InfrastructureAnalyzer.getLocalMaxMemoryFraction());
        }
        
        // --------- STATIC CACHE INIT/CLEANUP OPERATIONS ----------

http://git-wip-us.apache.org/repos/asf/systemml/blob/5b281520/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
index 656089f..7ff0343 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
@@ -44,7 +44,8 @@ public class InfrastructureAnalyzer
        
        //static local master node properties
        private static int  _localPar        = -1;
-       private static long _localJVMMaxMem  = -1;
+       private static long _localJVMMaxMem  = -1; //mutable
+       private static long _fLocalJVMMaxMem = -1; //immutable
        
        //static hadoop cluster properties
        private static int  _remotePar       = -1;
@@ -137,6 +138,12 @@ public class InfrastructureAnalyzer
                _localJVMMaxMem = localMem;
        }
        
+       public static double getLocalMaxMemoryFraction() {
+               //since parfor modifies _localJVMMaxMem, some internal 
primitives
+               //need access to the current fraction of total local memory
+               return (double)_localJVMMaxMem / _fLocalJVMMaxMem;
+       }
+       
        /**
         * Gets the maximum memory [in bytes] of a hadoop map task JVM.
         * 
@@ -343,6 +350,7 @@ public class InfrastructureAnalyzer
                //step 1: basic parallelism and memory
                _localPar       = Runtime.getRuntime().availableProcessors();
                _localJVMMaxMem = Runtime.getRuntime().maxMemory();
+               _fLocalJVMMaxMem = _localJVMMaxMem;
        }
        
        /**

http://git-wip-us.apache.org/repos/asf/systemml/blob/5b281520/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
index edd7418..2f41fa7 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.stream.IntStream;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -104,15 +105,16 @@ public class SpoofSPInstruction extends SPInstruction {
        @Override
        public void processInstruction(ExecutionContext ec)
                throws DMLRuntimeException 
-       {       
+       {
                SparkExecutionContext sec = (SparkExecutionContext)ec;
                
                //decide upon broadcast side inputs
                boolean[] bcVect = determineBroadcastInputs(sec, _in);
                boolean[] bcVect2 = getMatrixBroadcastVector(sec, _in, bcVect);
+               int main = getMainInputIndex(_in, bcVect);
                
                //create joined input rdd w/ replication if needed
-               MatrixCharacteristics mcIn = 
sec.getMatrixCharacteristics(_in[0].getName());
+               MatrixCharacteristics mcIn = 
sec.getMatrixCharacteristics(_in[main].getName());
                JavaPairRDD<MatrixIndexes, MatrixBlock[]> in = 
createJoinedInputRDD(
                        sec, _in, bcVect, (_class.getSuperclass() == 
SpoofOuterProduct.class));
                JavaPairRDD<MatrixIndexes, MatrixBlock> out = null;
@@ -120,7 +122,7 @@ public class SpoofSPInstruction extends SPInstruction {
                //create lists of input broadcasts and scalars
                ArrayList<PartitionedBroadcast<MatrixBlock>> bcMatrices = new 
ArrayList<>();
                ArrayList<ScalarObject> scalars = new ArrayList<>();
-               for( int i=1; i<_in.length; i++ ) {
+               for( int i=0; i<_in.length; i++ ) {
                        if( _in[i].getDataType()==DataType.MATRIX && bcVect[i] 
) {
                                
bcMatrices.add(sec.getBroadcastForVariable(_in[i].getName()));
                        }
@@ -152,7 +154,7 @@ public class SpoofSPInstruction extends SPInstruction {
                                
                                //maintain lineage info and output 
characteristics
                                maintainLineageInfo(sec, _in, bcVect, _out);
-                               updateOutputMatrixCharacteristics(sec, op);     
+                               updateOutputMatrixCharacteristics(sec, op);
                        }
                        else { //SCALAR
                                out = in.mapPartitionsToPair(new 
CellwiseFunction(
@@ -250,7 +252,7 @@ public class SpoofSPInstruction extends SPInstruction {
                
                //decided for each matrix input if it fits into remaining memory
                //budget; the major input, i.e., inputs[0] is always an RDD
-               for( int i=1; i<inputs.length; i++ ) 
+               for( int i=0; i<inputs.length; i++ ) 
                        if( inputs[i].getDataType().isMatrix() ) {
                                MatrixCharacteristics mc = 
sec.getMatrixCharacteristics(inputs[i].getName());
                                double sizeL = 
OptimizerUtils.estimateSizeExactSparsity(mc);
@@ -261,6 +263,10 @@ public class SpoofSPInstruction extends SPInstruction {
                                bcBudget -= ret[i] ? sizeP : 0; //in remote 
block managers
                        }
                
+               //ensure there is at least one RDD input, with awareness for 
scalars
+               if( !IntStream.range(0, ret.length).anyMatch(i -> 
inputs[i].isMatrix() && !ret[i]) )
+                       ret[0] = false;
+               
                return ret;
        }
        
@@ -280,12 +286,13 @@ public class SpoofSPInstruction extends SPInstruction {
                throws DMLRuntimeException
        {
                //get input rdd for main input
-               MatrixCharacteristics mcIn = 
sec.getMatrixCharacteristics(inputs[0].getName());
-               JavaPairRDD<MatrixIndexes, MatrixBlock> in = 
sec.getBinaryBlockRDDHandleForVariable(inputs[0].getName());
+               int main = getMainInputIndex(inputs, bcVect);
+               MatrixCharacteristics mcIn = 
sec.getMatrixCharacteristics(inputs[main].getName());
+               JavaPairRDD<MatrixIndexes, MatrixBlock> in = 
sec.getBinaryBlockRDDHandleForVariable(inputs[main].getName());
                JavaPairRDD<MatrixIndexes, MatrixBlock[]> ret = 
in.mapValues(new MapInputSignature());
                
-               for( int i=1; i<inputs.length; i++ )
-                       if( inputs[i].getDataType().isMatrix() && !bcVect[i] ) {
+               for( int i=0; i<inputs.length; i++ )
+                       if( i != main && inputs[i].getDataType().isMatrix() && 
!bcVect[i] ) {
                                //create side input rdd 
                                String varname = inputs[i].getName();
                                JavaPairRDD<MatrixIndexes, MatrixBlock> tmp = 
sec
@@ -315,6 +322,11 @@ public class SpoofSPInstruction extends SPInstruction {
                                sec.addLineage(output.getName(), 
inputs[i].getName(), bcVect[i]);
        }
        
+       private static int getMainInputIndex(CPOperand[] inputs, boolean[] 
bcVect) {
+               return IntStream.range(0, bcVect.length)
+                       .filter(i -> inputs[i].isMatrix() && 
!bcVect[i]).min().orElse(0);
+       }
+       
        private void updateOutputMatrixCharacteristics(SparkExecutionContext 
sec, SpoofOperator op) 
                throws DMLRuntimeException 
        {

Reply via email to