Repository: mahout Updated Branches: refs/heads/master c2b077f0a -> ec10963a4
MAHOUT-1785: Replace 'spark.kryoserializer.buffer.mb' from Spark config with 'spark.kryoserializer.buffer' Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/ec10963a Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/ec10963a Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/ec10963a Branch: refs/heads/master Commit: ec10963a436fea7d5775482fb22acb16bb970f1d Parents: c2b077f Author: smarthi <[email protected]> Authored: Fri Nov 6 16:44:41 2015 -0500 Committer: smarthi <[email protected]> Committed: Sat Nov 7 23:53:58 2015 -0500 ---------------------------------------------------------------------- .../org/apache/mahout/drivers/MahoutSparkDriver.scala | 6 +++++- .../sparkbindings/test/DistributedSparkSuite.scala | 14 ++++++++------ 2 files changed, 13 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/ec10963a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala index 40ffab3..3869830 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala @@ -75,7 +75,11 @@ abstract class MahoutSparkDriver extends MahoutDriver { override protected def start() : Unit = { if (!_useExistingContext) { sparkConf.set("spark.kryo.referenceTracking", "false") - .set("spark.kryoserializer.buffer.mb", "200")// this is default for Mahout optimizer, change it with -D option + .set("spark.kryoserializer.buffer.mb", "200m")// this is default for Mahout optimizer, change it with -D option + // the previous has been marked deprecated as of Spark 1.4 by the below line, + // remove the above line when Spark finally retires above for below + .set("spark.kryoserializer.buffer", "200m") + if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "") sparkConf.set("spark.executor.memory", parser.opts("sparkExecutorMem").asInstanceOf[String]) http://git-wip-us.apache.org/repos/asf/mahout/blob/ec10963a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala index d917a22..a9dc874 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala @@ -18,12 +18,13 @@ package org.apache.mahout.sparkbindings.test import org.apache.log4j.{Level, Logger} -import org.scalatest.{ConfigMap, BeforeAndAfterAllConfigMap, Suite} -import org.apache.spark.SparkConf -import org.apache.mahout.sparkbindings._ -import org.apache.mahout.test.{DistributedMahoutSuite, MahoutSuite} import org.apache.mahout.math.drm.DistributedContext -import collection.JavaConversions._ +import org.apache.mahout.sparkbindings._ +import org.apache.mahout.test.DistributedMahoutSuite +import org.apache.spark.SparkConf +import org.scalatest.{ConfigMap, Suite} + +import scala.collection.JavaConversions._ trait DistributedSparkSuite extends DistributedMahoutSuite with LoggerConfiguration { this: Suite => @@ -39,7 +40,8 @@ trait DistributedSparkSuite extends DistributedMahoutSuite with LoggerConfigurat // Do not run MAHOUT_HOME jars in unit tests. addMahoutJars = !isLocal, sparkConf = new SparkConf() - .set("spark.kryoserializer.buffer.mb", "40") + .set("spark.kryoserializer.buffer.mb", "40m") + .set("spark.kryoserializer.buffer", "40m") .set("spark.akka.frameSize", "30") .set("spark.default.parallelism", "10") .set("spark.executor.memory", "2G")
