2016-02-15 14:02 GMT+01:00 Sun, Rui <rui....@intel.com>: > On computation, RRDD launches one R process for each partition, so there > won't be thread-safe issue > > Could you give more details on your new environment?
Running on EC2, I start the executors via /usr/bin/R CMD javareconf -e "/usr/lib/spark/sbin/start-master.sh" I invoke R via roughly object R { case class Element(value: Double) lazy val re = Option(REngine.getLastEngine()).getOrElse({ val eng = new JRI.JRIEngine() eng.parseAndEval(scala.io.Source.fromInputStream(this.getClass().getClassLoader().getResourceAsStream("r/fit.R")).mkString) eng }) def fit(curve: Seq[Element]): Option[Fitting] = { synchronized { val env = re.newEnvironment(null, false) re.assign("curve", new REXPDouble(curve.map(_.value).toArray), env) val df = re.parseAndEval("data.frame(curve=curve)", env, true) re.assign("df", df, env) val fitted = re.parseAndEval("fit(df)", env, true).asList if (fitted.keys == null) { None } else { val map = fitted.keys.map(key => (key, fitted.at(key).asDouble)).toMap Some(Fitting(map("values"))) } } } } where `fit` is wrapped in an UDAF. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org