Repository: systemml
Updated Branches:
  refs/heads/master 4ae6beee5 -> f59a2dc22


[SYSTEMML-2013] Fix robustness spark nary cbind and rpc configuration

This patch addresses a recent perftest issue with stratstats on the 80GB
scenario. First, we now automatically configure (depending on the spark
version) either 'spark.akka.frameSize' or 'spark.rpc.message.maxSize' to
a more robust value of 512MB as we have done manually in the past.
Second, this patch improves the robustness of the recently added nary
cbind to use a preferred number of output partitions for the final
shuffle, which avoids large partitions in case of many inputs.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/f59a2dc2
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/f59a2dc2
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/f59a2dc2

Branch: refs/heads/master
Commit: f59a2dc22ecd4645d22a6055e9682a606672e839
Parents: 4ae6bee
Author: Matthias Boehm <[email protected]>
Authored: Sat Nov 11 22:10:24 2017 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Sat Nov 11 22:10:24 2017 -0800

----------------------------------------------------------------------
 .../controlprogram/context/SparkExecutionContext.java     | 10 +++++++++-
 .../instructions/spark/BuiltinNarySPInstruction.java      |  4 +++-
 2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/f59a2dc2/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 467b6fc..ff47b3a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -261,7 +261,15 @@ public class SparkExecutionContext extends ExecutionContext
                if( !conf.contains("spark.locality.wait") ) { //default 3s
                        conf.set("spark.locality.wait", "5s");
                }
-
+               
+               //increase max message size for robustness
+               String sparkVersion = 
org.apache.spark.package$.MODULE$.SPARK_VERSION();
+               String msgSizeConf = 
(UtilFunctions.compareVersion(sparkVersion, "2.0.0") < 0) ?
+                       "spark.akka.frameSize" : "spark.rpc.message.maxSize";
+               if( !conf.contains(msgSizeConf) ) { //default 128MB
+                       conf.set(msgSizeConf, "512");
+               }
+               
                return conf;
        }
        

http://git-wip-us.apache.org/repos/asf/systemml/blob/f59a2dc2/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinNarySPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinNarySPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinNarySPInstruction.java
index 4edb1cf..71d0003 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinNarySPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinNarySPInstruction.java
@@ -28,6 +28,7 @@ import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import 
org.apache.sysml.runtime.instructions.spark.AppendGSPInstruction.ShiftMatrix;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
+import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
@@ -85,7 +86,8 @@ public class BuiltinNarySPInstruction extends SPInstruction
                }
                
                //aggregate partially overlapping blocks w/ single shuffle
-               out = RDDAggregateUtils.mergeByKey(out);
+               int numPartOut = SparkUtils.getNumPreferredPartitions(mcOut);
+               out = RDDAggregateUtils.mergeByKey(out, numPartOut, false);
                
                //set output RDD and add lineage
                sec.getMatrixCharacteristics(output.getName()).set(mcOut);

Reply via email to