Repository: systemml Updated Branches: refs/heads/master 43138fad5 -> 5b2815206
[SYSTEMML-2200] Fix unnecessary shuffle in distributed codegen ops This patch makes a general improvement to distributed codegen operations on spark. So far, the first input was always assumed to be an RDD and the inputs to a fused operator where ordered accordingly. However, in the presence of unknowns, there were scenarios with unnecessary RDD inputs and thus, unnecessary shuffle due to distributed joins (with large performance impact). We now decide on broadcasts in a more generalized way. Furthermore, this also fixes the accounting of currently pinned broadcasts, which was way too conservative in a parfor context. For example, on perftest Kmeans 10Mx1K with 10 concurrent runs, this patch improved the end-to-end runtime from 9305s to 491s. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/5b281520 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/5b281520 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/5b281520 Branch: refs/heads/master Commit: 5b2815206488d3af1f331e0f281d71d324b2b817 Parents: 43138fa Author: Matthias Boehm <[email protected]> Authored: Mon Mar 19 14:51:32 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Mon Mar 19 15:54:08 2018 -0700 ---------------------------------------------------------------------- .../controlprogram/caching/CacheableData.java | 6 +++- .../parfor/stat/InfrastructureAnalyzer.java | 10 ++++++- .../instructions/spark/SpoofSPInstruction.java | 30 ++++++++++++++------ 3 files changed, 35 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/5b281520/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java index e2b9838..cdc0f03 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java @@ -38,6 +38,7 @@ import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.LazyWriteBuffer.RPolicy; +import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence; import org.apache.sysml.runtime.instructions.cp.CPInstruction; import org.apache.sysml.runtime.instructions.cp.Data; @@ -1275,7 +1276,10 @@ public abstract class CacheableData<T extends CacheBlock> extends Data } public static long getBroadcastSize() { - return _refBCs.longValue(); + //scale the total sum of all broadcasts by the current fraction + //of local memory to equally distribute it across parfor workers + return (long) (_refBCs.longValue() * + InfrastructureAnalyzer.getLocalMaxMemoryFraction()); } // --------- STATIC CACHE INIT/CLEANUP OPERATIONS ---------- http://git-wip-us.apache.org/repos/asf/systemml/blob/5b281520/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java index 656089f..7ff0343 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java @@ -44,7 +44,8 @@ public class InfrastructureAnalyzer //static local master node properties private static int _localPar = -1; - private static long _localJVMMaxMem = -1; + private static long _localJVMMaxMem = -1; //mutable + private static long _fLocalJVMMaxMem = -1; //immutable //static hadoop cluster properties private static int _remotePar = -1; @@ -137,6 +138,12 @@ public class InfrastructureAnalyzer _localJVMMaxMem = localMem; } + public static double getLocalMaxMemoryFraction() { + //since parfor modifies _localJVMMaxMem, some internal primitives + //need access to the current fraction of total local memory + return (double)_localJVMMaxMem / _fLocalJVMMaxMem; + } + /** * Gets the maximum memory [in bytes] of a hadoop map task JVM. * @@ -343,6 +350,7 @@ public class InfrastructureAnalyzer //step 1: basic parallelism and memory _localPar = Runtime.getRuntime().availableProcessors(); _localJVMMaxMem = Runtime.getRuntime().maxMemory(); + _fLocalJVMMaxMem = _localJVMMaxMem; } /** http://git-wip-us.apache.org/repos/asf/systemml/blob/5b281520/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java index edd7418..2f41fa7 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.stream.IntStream; import org.apache.commons.lang3.ArrayUtils; import org.apache.spark.api.java.JavaPairRDD; @@ -104,15 +105,16 @@ public class SpoofSPInstruction extends SPInstruction { @Override public void processInstruction(ExecutionContext ec) throws DMLRuntimeException - { + { SparkExecutionContext sec = (SparkExecutionContext)ec; //decide upon broadcast side inputs boolean[] bcVect = determineBroadcastInputs(sec, _in); boolean[] bcVect2 = getMatrixBroadcastVector(sec, _in, bcVect); + int main = getMainInputIndex(_in, bcVect); //create joined input rdd w/ replication if needed - MatrixCharacteristics mcIn = sec.getMatrixCharacteristics(_in[0].getName()); + MatrixCharacteristics mcIn = sec.getMatrixCharacteristics(_in[main].getName()); JavaPairRDD<MatrixIndexes, MatrixBlock[]> in = createJoinedInputRDD( sec, _in, bcVect, (_class.getSuperclass() == SpoofOuterProduct.class)); JavaPairRDD<MatrixIndexes, MatrixBlock> out = null; @@ -120,7 +122,7 @@ public class SpoofSPInstruction extends SPInstruction { //create lists of input broadcasts and scalars ArrayList<PartitionedBroadcast<MatrixBlock>> bcMatrices = new ArrayList<>(); ArrayList<ScalarObject> scalars = new ArrayList<>(); - for( int i=1; i<_in.length; i++ ) { + for( int i=0; i<_in.length; i++ ) { if( _in[i].getDataType()==DataType.MATRIX && bcVect[i] ) { bcMatrices.add(sec.getBroadcastForVariable(_in[i].getName())); } @@ -152,7 +154,7 @@ public class SpoofSPInstruction extends SPInstruction { //maintain lineage info and output characteristics maintainLineageInfo(sec, _in, bcVect, _out); - updateOutputMatrixCharacteristics(sec, op); + updateOutputMatrixCharacteristics(sec, op); } else { //SCALAR out = in.mapPartitionsToPair(new CellwiseFunction( @@ -250,7 +252,7 @@ public class SpoofSPInstruction extends SPInstruction { //decided for each matrix input if it fits into remaining memory //budget; the major input, i.e., inputs[0] is always an RDD - for( int i=1; i<inputs.length; i++ ) + for( int i=0; i<inputs.length; i++ ) if( inputs[i].getDataType().isMatrix() ) { MatrixCharacteristics mc = sec.getMatrixCharacteristics(inputs[i].getName()); double sizeL = OptimizerUtils.estimateSizeExactSparsity(mc); @@ -261,6 +263,10 @@ public class SpoofSPInstruction extends SPInstruction { bcBudget -= ret[i] ? sizeP : 0; //in remote block managers } + //ensure there is at least one RDD input, with awareness for scalars + if( !IntStream.range(0, ret.length).anyMatch(i -> inputs[i].isMatrix() && !ret[i]) ) + ret[0] = false; + return ret; } @@ -280,12 +286,13 @@ public class SpoofSPInstruction extends SPInstruction { throws DMLRuntimeException { //get input rdd for main input - MatrixCharacteristics mcIn = sec.getMatrixCharacteristics(inputs[0].getName()); - JavaPairRDD<MatrixIndexes, MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable(inputs[0].getName()); + int main = getMainInputIndex(inputs, bcVect); + MatrixCharacteristics mcIn = sec.getMatrixCharacteristics(inputs[main].getName()); + JavaPairRDD<MatrixIndexes, MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable(inputs[main].getName()); JavaPairRDD<MatrixIndexes, MatrixBlock[]> ret = in.mapValues(new MapInputSignature()); - for( int i=1; i<inputs.length; i++ ) - if( inputs[i].getDataType().isMatrix() && !bcVect[i] ) { + for( int i=0; i<inputs.length; i++ ) + if( i != main && inputs[i].getDataType().isMatrix() && !bcVect[i] ) { //create side input rdd String varname = inputs[i].getName(); JavaPairRDD<MatrixIndexes, MatrixBlock> tmp = sec @@ -315,6 +322,11 @@ public class SpoofSPInstruction extends SPInstruction { sec.addLineage(output.getName(), inputs[i].getName(), bcVect[i]); } + private static int getMainInputIndex(CPOperand[] inputs, boolean[] bcVect) { + return IntStream.range(0, bcVect.length) + .filter(i -> inputs[i].isMatrix() && !bcVect[i]).min().orElse(0); + } + private void updateOutputMatrixCharacteristics(SparkExecutionContext sec, SpoofOperator op) throws DMLRuntimeException {
