Repository: systemml Updated Branches: refs/heads/master 4ae6beee5 -> f59a2dc22
[SYSTEMML-2013] Fix robustness spark nary cbind and rpc configuration This patch addresses a recent perftest issue with stratstats on the 80GB scenario. First, we now automatically configure (depending on the spark version) either 'spark.akka.frameSize' or 'spark.rpc.message.maxSize' to a more robust value of 512MB as we have done manually in the past. Second, this patch improves the robustness of the recently added nary cbind to use a preferred number of output partitions for the final shuffle, which avoids large partitions in case of many inputs. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/f59a2dc2 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/f59a2dc2 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/f59a2dc2 Branch: refs/heads/master Commit: f59a2dc22ecd4645d22a6055e9682a606672e839 Parents: 4ae6bee Author: Matthias Boehm <[email protected]> Authored: Sat Nov 11 22:10:24 2017 -0800 Committer: Matthias Boehm <[email protected]> Committed: Sat Nov 11 22:10:24 2017 -0800 ---------------------------------------------------------------------- .../controlprogram/context/SparkExecutionContext.java | 10 +++++++++- .../instructions/spark/BuiltinNarySPInstruction.java | 4 +++- 2 files changed, 12 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/f59a2dc2/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index 467b6fc..ff47b3a 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -261,7 +261,15 @@ public class SparkExecutionContext extends ExecutionContext if( !conf.contains("spark.locality.wait") ) { //default 3s conf.set("spark.locality.wait", "5s"); } - + + //increase max message size for robustness + String sparkVersion = org.apache.spark.package$.MODULE$.SPARK_VERSION(); + String msgSizeConf = (UtilFunctions.compareVersion(sparkVersion, "2.0.0") < 0) ? + "spark.akka.frameSize" : "spark.rpc.message.maxSize"; + if( !conf.contains(msgSizeConf) ) { //default 128MB + conf.set(msgSizeConf, "512"); + } + return conf; } http://git-wip-us.apache.org/repos/asf/systemml/blob/f59a2dc2/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinNarySPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinNarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinNarySPInstruction.java index 4edb1cf..71d0003 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinNarySPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinNarySPInstruction.java @@ -28,6 +28,7 @@ import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.spark.AppendGSPInstruction.ShiftMatrix; import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; +import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; @@ -85,7 +86,8 @@ public class BuiltinNarySPInstruction extends SPInstruction } //aggregate partially overlapping blocks w/ single shuffle - out = RDDAggregateUtils.mergeByKey(out); + int numPartOut = SparkUtils.getNumPreferredPartitions(mcOut); + out = RDDAggregateUtils.mergeByKey(out, numPartOut, false); //set output RDD and add lineage sec.getMatrixCharacteristics(output.getName()).set(mcOut);
