2016-02-15 14:02 GMT+01:00 Sun, Rui <rui....@intel.com>:
> On computation, RRDD launches one R process for each partition, so there 
> won't be thread-safe issue
>
> Could you give more details on your new environment?

Running on EC2, I start the executors via

     /usr/bin/R CMD javareconf -e "/usr/lib/spark/sbin/start-master.sh"

I invoke R via roughly

    object R {
      case class Element(value: Double)
      lazy val re = Option(REngine.getLastEngine()).getOrElse({
        val eng = new JRI.JRIEngine()
        
eng.parseAndEval(scala.io.Source.fromInputStream(this.getClass().getClassLoader().getResourceAsStream("r/fit.R")).mkString)
        eng
      })

      def fit(curve: Seq[Element]): Option[Fitting] = {
        synchronized {
          val env = re.newEnvironment(null, false)
          re.assign("curve", new REXPDouble(curve.map(_.value).toArray), env)
          val df = re.parseAndEval("data.frame(curve=curve)", env, true)
          re.assign("df", df, env)
          val fitted = re.parseAndEval("fit(df)", env, true).asList
          if (fitted.keys == null) {
            None
          } else {
            val map = fitted.keys.map(key => (key,
fitted.at(key).asDouble)).toMap
            Some(Fitting(map("values")))
          }
        }
      }
    }

where `fit` is wrapped in an UDAF.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to