http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/controller/SampleEngine.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/prediction/controller/SampleEngine.scala b/core/src/test/scala/io/prediction/controller/SampleEngine.scala deleted file mode 100644 index 3a28ca9..0000000 --- a/core/src/test/scala/io/prediction/controller/SampleEngine.scala +++ /dev/null @@ -1,472 +0,0 @@ -package io.prediction.controller - -import io.prediction.controller.{Params => PIOParams} -import io.prediction.core._ - -import grizzled.slf4j.Logger -import io.prediction.workflow.WorkflowParams -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -object Engine0 { - @transient lazy val logger = Logger[this.type] - - case class TrainingData(id: Int, error: Boolean = false) extends SanityCheck { - def sanityCheck(): Unit = { - Predef.assert(!error, "Not Error") - } - } - - case class EvalInfo(id: Int) - case class ProcessedData(id: Int, td: TrainingData) - - case class Query(id: Int, ex: Int = 0, qx: Int = 0, supp: Boolean = false) - case class Actual(id: Int, ex: Int = 0, qx: Int = 0) - case class Prediction( - id: Int, q: Query, models: Option[Any] = None, - ps: Seq[Prediction] = Seq[Prediction]()) - - class PDataSource0(id: Int = 0) - extends PDataSource[TrainingData, EvalInfo, Query, Actual] { - def readTraining(sc: SparkContext): TrainingData = { - TrainingData(id) - } - } - - class PDataSource1(id: Int = 0, en: Int = 0, qn: Int = 0) - extends PDataSource[TrainingData, EvalInfo, Query, Actual] { - def readTraining(sc: SparkContext): TrainingData = TrainingData(id) - - override - def readEval(sc: SparkContext) - : Seq[(TrainingData, EvalInfo, RDD[(Query, Actual)])] = { - (0 until en).map { ex => { - val qaSeq: Seq[(Query, Actual)] = (0 until qn).map { qx => { - (Query(id, ex=ex, qx=qx), Actual(id, ex, qx)) - }} - (TrainingData(id), EvalInfo(id), sc.parallelize(qaSeq)) - }} - } - } - - object PDataSource2 { - case class Params(id: Int, en: Int = 0, qn: Int = 0) extends PIOParams - } - - class PDataSource2(params: PDataSource2.Params) - extends PDataSource[TrainingData, EvalInfo, Query, Actual] { - val id = params.id - def readTraining(sc: SparkContext): TrainingData = TrainingData(id) - - override - def readEval(sc: SparkContext) - : Seq[(TrainingData, EvalInfo, RDD[(Query, Actual)])] = { - (0 until params.en).map { ex => { - val qaSeq: Seq[(Query, Actual)] = (0 until params.qn).map { qx => { - (Query(id, ex=ex, qx=qx), Actual(id, ex, qx)) - }} - (TrainingData(id), EvalInfo(id), sc.parallelize(qaSeq)) - }} - } - } - - class PDataSource3(id: Int = 0, error: Boolean = false) - extends PDataSource[TrainingData, EvalInfo, Query, Actual] { - def readTraining(sc: SparkContext): TrainingData = { - TrainingData(id = id, error = error) - } - } - - object PDataSource4 { - class Params(val id: Int, val en: Int = 0, val qn: Int = 0) - extends PIOParams - } - - class PDataSource4(params: PDataSource4.Params) - extends PDataSource[TrainingData, EvalInfo, Query, Actual] { - val id = params.id - def readTraining(sc: SparkContext): TrainingData = TrainingData(id) - - override - def readEval(sc: SparkContext) - : Seq[(TrainingData, EvalInfo, RDD[(Query, Actual)])] = { - (0 until params.en).map { ex => { - val qaSeq: Seq[(Query, Actual)] = (0 until params.qn).map { qx => { - (Query(id, ex=ex, qx=qx), Actual(id, ex, qx)) - }} - (TrainingData(id), EvalInfo(id), sc.parallelize(qaSeq)) - }} - } - } - - class LDataSource0(id: Int, en: Int = 0, qn: Int = 0) - extends LDataSource[TrainingData, EvalInfo, Query, Actual] { - def readTraining(): TrainingData = TrainingData(id) - - override - def readEval() - : Seq[(TrainingData, EvalInfo, Seq[(Query, Actual)])] = { - (0 until en).map { ex => { - val qaSeq: Seq[(Query, Actual)] = (0 until qn).map { qx => { - (Query(id, ex=ex, qx=qx), Actual(id, ex, qx)) - }} - (TrainingData(id), EvalInfo(id), qaSeq) - }} - } - } - - object LDataSource1 { - case class Params(id: Int, en: Int = 0, qn: Int = 0) extends PIOParams - } - - class LDataSource1(params: LDataSource1.Params) - extends LDataSource[TrainingData, EvalInfo, Query, Actual] { - val id = params.id - def readTraining(): TrainingData = TrainingData(id) - - override - def readEval(): Seq[(TrainingData, EvalInfo, Seq[(Query, Actual)])] = { - (0 until params.en).map { ex => { - val qaSeq: Seq[(Query, Actual)] = (0 until params.qn).map { qx => { - (Query(id, ex=ex, qx=qx), Actual(id, ex, qx)) - }} - (TrainingData(id), EvalInfo(id), qaSeq) - }} - } - } - - class PPreparator0(id: Int = 0) - extends PPreparator[TrainingData, ProcessedData] { - def prepare(sc: SparkContext, td: TrainingData): ProcessedData = { - ProcessedData(id, td) - } - } - - object PPreparator1 { - case class Params(id: Int = 0) extends PIOParams - } - - class PPreparator1(params: PPreparator1.Params) - extends PPreparator[TrainingData, ProcessedData] { - def prepare(sc: SparkContext, td: TrainingData): ProcessedData = { - ProcessedData(params.id, td) - } - } - - class LPreparator0(id: Int = 0) - extends LPreparator[TrainingData, ProcessedData] { - def prepare(td: TrainingData): ProcessedData = { - ProcessedData(id, td) - } - } - - object LPreparator1 { - case class Params(id: Int = 0) extends PIOParams - } - - class LPreparator1(params: LPreparator1.Params) - extends LPreparator[TrainingData, ProcessedData] { - def prepare(td: TrainingData): ProcessedData = { - ProcessedData(params.id, td) - } - } - - object PAlgo0 { - case class Model(id: Int, pd: ProcessedData) - } - - class PAlgo0(id: Int = 0) - extends PAlgorithm[ProcessedData, PAlgo0.Model, Query, Prediction] { - def train(sc: SparkContext, pd: ProcessedData) - : PAlgo0.Model = PAlgo0.Model(id, pd) - - override - def batchPredict(m: PAlgo0.Model, qs: RDD[(Long, Query)]) - : RDD[(Long, Prediction)] = { - qs.mapValues(q => Prediction(id, q, Some(m))) - } - - def predict(m: PAlgo0.Model, q: Query): Prediction = { - Prediction(id, q, Some(m)) - } - } - - object PAlgo1 { - case class Model(id: Int, pd: ProcessedData) - } - - class PAlgo1(id: Int = 0) - extends PAlgorithm[ProcessedData, PAlgo1.Model, Query, Prediction] { - def train(sc: SparkContext, pd: ProcessedData) - : PAlgo1.Model = PAlgo1.Model(id, pd) - - override - def batchPredict(m: PAlgo1.Model, qs: RDD[(Long, Query)]) - : RDD[(Long, Prediction)] = { - qs.mapValues(q => Prediction(id, q, Some(m))) - } - - def predict(m: PAlgo1.Model, q: Query): Prediction = { - Prediction(id, q, Some(m)) - } - } - - object PAlgo2 { - case class Model(id: Int, pd: ProcessedData) - case class Params(id: Int) extends PIOParams - } - - class PAlgo2(params: PAlgo2.Params) - extends PAlgorithm[ProcessedData, PAlgo2.Model, Query, Prediction] { - val id = params.id - - def train(sc: SparkContext, pd: ProcessedData) - : PAlgo2.Model = PAlgo2.Model(id, pd) - - override - def batchPredict(m: PAlgo2.Model, qs: RDD[(Long, Query)]) - : RDD[(Long, Prediction)] = { - qs.mapValues(q => Prediction(id, q, Some(m))) - } - - def predict(m: PAlgo2.Model, q: Query): Prediction = { - Prediction(id, q, Some(m)) - } - } - - object PAlgo3 { - case class Model(id: Int, pd: ProcessedData) - extends LocalFileSystemPersistentModel[Params] - - object Model extends LocalFileSystemPersistentModelLoader[Params, Model] - - case class Params(id: Int) extends PIOParams - } - - class PAlgo3(params: PAlgo3.Params) - extends PAlgorithm[ProcessedData, PAlgo3.Model, Query, Prediction] { - val id = params.id - - def train(sc: SparkContext, pd: ProcessedData) - : PAlgo3.Model = PAlgo3.Model(id, pd) - - override - def batchPredict(m: PAlgo3.Model, qs: RDD[(Long, Query)]) - : RDD[(Long, Prediction)] = { - qs.mapValues(q => Prediction(id, q, Some(m))) - } - - def predict(m: PAlgo3.Model, q: Query): Prediction = { - Prediction(id, q, Some(m)) - } - } - - object LAlgo0 { - case class Model(id: Int, pd: ProcessedData) - } - - class LAlgo0(id: Int = 0) - extends LAlgorithm[ProcessedData, LAlgo0.Model, Query, Prediction] { - def train(pd: ProcessedData): LAlgo0.Model = LAlgo0.Model(id, pd) - - def predict(m: LAlgo0.Model, q: Query): Prediction = { - Prediction(id, q, Some(m)) - } - } - - object LAlgo1 { - case class Model(id: Int, pd: ProcessedData) - } - - class LAlgo1(id: Int = 0) - extends LAlgorithm[ProcessedData, LAlgo1.Model, Query, Prediction] { - def train(pd: ProcessedData): LAlgo1.Model = LAlgo1.Model(id, pd) - - def predict(m: LAlgo1.Model, q: Query): Prediction = { - Prediction(id, q, Some(m)) - } - } - - object LAlgo2 { - case class Params(id: Int) extends PIOParams - - case class Model(id: Int, pd: ProcessedData) - extends LocalFileSystemPersistentModel[EmptyParams] - - object Model extends LocalFileSystemPersistentModelLoader[EmptyParams, Model] - } - - class LAlgo2(params: LAlgo2.Params) - extends LAlgorithm[ProcessedData, LAlgo2.Model, Query, Prediction] { - def train(pd: ProcessedData): LAlgo2.Model = LAlgo2.Model(params.id, pd) - - def predict(m: LAlgo2.Model, q: Query): Prediction = { - Prediction(params.id, q, Some(m)) - } - } - - object LAlgo3 { - case class Params(id: Int) extends PIOParams - - case class Model(id: Int, pd: ProcessedData) - } - - class LAlgo3(params: LAlgo3.Params) - extends LAlgorithm[ProcessedData, LAlgo3.Model, Query, Prediction] { - def train(pd: ProcessedData): LAlgo3.Model = LAlgo3.Model(params.id, pd) - - def predict(m: LAlgo3.Model, q: Query): Prediction = { - Prediction(params.id, q, Some(m)) - } - } - - // N : P2L. As N is in the middle of P and L. - object NAlgo0 { - case class Model(id: Int, pd: ProcessedData) - } - - class NAlgo0 (id: Int = 0) - extends P2LAlgorithm[ProcessedData, NAlgo0.Model, Query, Prediction] { - def train(sc: SparkContext, pd: ProcessedData) - : NAlgo0.Model = NAlgo0.Model(id, pd) - - def predict(m: NAlgo0.Model, q: Query): Prediction = { - Prediction(id, q, Some(m)) - } - } - - object NAlgo1 { - case class Model(id: Int, pd: ProcessedData) - } - - class NAlgo1 (id: Int = 0) - extends P2LAlgorithm[ProcessedData, NAlgo1.Model, Query, Prediction] { - def train(sc: SparkContext, pd: ProcessedData) - : NAlgo1.Model = NAlgo1.Model(id, pd) - - def predict(m: NAlgo1.Model, q: Query): Prediction = { - Prediction(id, q, Some(m)) - } - } - - object NAlgo2 { - case class Params(id: Int) extends PIOParams - - case class Model(id: Int, pd: ProcessedData) - extends LocalFileSystemPersistentModel[EmptyParams] - - object Model extends LocalFileSystemPersistentModelLoader[EmptyParams, Model] - } - - class NAlgo2(params: NAlgo2.Params) - extends P2LAlgorithm[ProcessedData, NAlgo2.Model, Query, Prediction] { - def train(sc: SparkContext, pd: ProcessedData) - : NAlgo2.Model = NAlgo2.Model(params.id, pd) - - def predict(m: NAlgo2.Model, q: Query): Prediction = { - Prediction(params.id, q, Some(m)) - } - } - - object NAlgo3 { - case class Params(id: Int) extends PIOParams - - case class Model(id: Int, pd: ProcessedData) - } - - class NAlgo3(params: NAlgo3.Params) - extends P2LAlgorithm[ProcessedData, NAlgo3.Model, Query, Prediction] { - def train(sc: SparkContext, pd: ProcessedData) - : NAlgo3.Model = NAlgo3.Model(params.id, pd) - - def predict(m: NAlgo3.Model, q: Query): Prediction = { - Prediction(params.id, q, Some(m)) - } - } - - class LServing0(id: Int = 0) extends LServing[Query, Prediction] { - def serve(q: Query, ps: Seq[Prediction]): Prediction = { - Prediction(id, q, ps=ps) - } - } - - object LServing1 { - case class Params(id: Int) extends PIOParams - } - - class LServing1(params: LServing1.Params) extends LServing[Query, Prediction] { - def serve(q: Query, ps: Seq[Prediction]): Prediction = { - Prediction(params.id, q, ps=ps) - } - } - - class LServing2(id: Int) extends LServing[Query, Prediction] { - override - def supplement(q: Query): Query = q.copy(supp = true) - - def serve(q: Query, ps: Seq[Prediction]): Prediction = { - Prediction(id, q, ps=ps) - } - } -} - -object Engine1 { - case class EvalInfo(v: Double) extends Serializable - case class Query() extends Serializable - case class Prediction() extends Serializable - case class Actual() extends Serializable - case class DSP(v: Double) extends Params -} - -class Engine1 -extends BaseEngine[ - Engine1.EvalInfo, Engine1.Query, Engine1.Prediction, Engine1.Actual] { - - def train( - sc: SparkContext, - engineParams: EngineParams, - engineInstanceId: String = "", - params: WorkflowParams = WorkflowParams()): Seq[Any] = Seq[Any]() - - def eval(sc: SparkContext, engineParams: EngineParams, params: WorkflowParams) - : Seq[(Engine1.EvalInfo, - RDD[(Engine1.Query, Engine1.Prediction, Engine1.Actual)])] = { - val dsp = engineParams.dataSourceParams._2.asInstanceOf[Engine1.DSP] - Seq( - (Engine1.EvalInfo(dsp.v), - sc.emptyRDD[(Engine1.Query, Engine1.Prediction, Engine1.Actual)])) - } -} - - -class Metric0 -extends Metric[Engine1.EvalInfo, Engine1.Query, Engine1.Prediction, -Engine1.Actual, Double] { - override def header: String = "Metric0" - - def calculate( - sc: SparkContext, - evalDataSet: Seq[(Engine1.EvalInfo, RDD[(Engine1.Query, Engine1.Prediction, - Engine1.Actual)])]): Double = { - evalDataSet.head._1.v - } -} - -object Metric1 { - case class Result(c: Int, v: Double) extends Serializable -} - -class Metric1 -extends Metric[Engine1.EvalInfo, Engine1.Query, Engine1.Prediction, -Engine1.Actual, Metric1.Result]()(Ordering.by[Metric1.Result, Double](_.v)) { - override def header: String = "Metric1" - - def calculate( - sc: SparkContext, - evalDataSet: Seq[(Engine1.EvalInfo, RDD[(Engine1.Query, Engine1.Prediction, - Engine1.Actual)])]): Metric1.Result = { - Metric1.Result(0, evalDataSet.head._1.v) - } -} -
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/workflow/BaseTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/prediction/workflow/BaseTest.scala b/core/src/test/scala/io/prediction/workflow/BaseTest.scala deleted file mode 100644 index 4925558..0000000 --- a/core/src/test/scala/io/prediction/workflow/BaseTest.scala +++ /dev/null @@ -1,75 +0,0 @@ -//package org.apache.spark -package io.prediction.workflow - -import _root_.io.netty.util.internal.logging.{Slf4JLoggerFactory, InternalLoggerFactory} -import org.scalatest.BeforeAndAfterAll -import org.scalatest.BeforeAndAfterEach -import org.scalatest.Suite -import org.apache.spark.SparkContext -import org.apache.spark.SparkConf - - -/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it - * after each test. */ -trait LocalSparkContext -extends BeforeAndAfterEach with BeforeAndAfterAll { self: Suite => - - @transient var sc: SparkContext = _ - - override def beforeAll() { - InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory()) - super.beforeAll() - } - - override def afterEach() { - resetSparkContext() - super.afterEach() - } - - def resetSparkContext() = { - LocalSparkContext.stop(sc) - sc = null - } - -} - -object LocalSparkContext { - def stop(sc: SparkContext) { - if (sc != null) { - sc.stop() - } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port") - } - - /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ - def withSpark[T](sc: SparkContext)(f: SparkContext => T) = { - try { - f(sc) - } finally { - stop(sc) - } - } - -} -/** Shares a local `SparkContext` between all tests in a suite and closes it at the end */ -trait SharedSparkContext extends BeforeAndAfterAll { self: Suite => - - @transient private var _sc: SparkContext = _ - - def sc: SparkContext = _sc - - var conf = new SparkConf(false) - - override def beforeAll() { - _sc = new SparkContext("local[4]", "test", conf) - super.beforeAll() - } - - override def afterAll() { - LocalSparkContext.stop(_sc) - _sc = null - super.afterAll() - } -} - http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/workflow/EngineWorkflowTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/prediction/workflow/EngineWorkflowTest.scala b/core/src/test/scala/io/prediction/workflow/EngineWorkflowTest.scala deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/workflow/EvaluationWorkflowTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/prediction/workflow/EvaluationWorkflowTest.scala b/core/src/test/scala/io/prediction/workflow/EvaluationWorkflowTest.scala deleted file mode 100644 index 7a50d33..0000000 --- a/core/src/test/scala/io/prediction/workflow/EvaluationWorkflowTest.scala +++ /dev/null @@ -1,61 +0,0 @@ -package io.prediction.workflow - -import io.prediction.controller._ - -import org.scalatest.FunSuite -import org.scalatest.Matchers._ - -class EvaluationWorkflowSuite extends FunSuite with SharedSparkContext { - - test("Evaluation return best engine params, simple result type: Double") { - val engine = new Engine1() - val ep0 = EngineParams(dataSourceParams = Engine1.DSP(0.2)) - val ep1 = EngineParams(dataSourceParams = Engine1.DSP(0.3)) - val ep2 = EngineParams(dataSourceParams = Engine1.DSP(0.3)) - val ep3 = EngineParams(dataSourceParams = Engine1.DSP(-0.2)) - val engineParamsList = Seq(ep0, ep1, ep2, ep3) - - val evaluator = MetricEvaluator(new Metric0()) - - object Eval extends Evaluation { - engineEvaluator = (new Engine1(), MetricEvaluator(new Metric0())) - } - - val result = EvaluationWorkflow.runEvaluation( - sc, - Eval, - engine, - engineParamsList, - evaluator, - WorkflowParams()) - - result.bestScore.score shouldBe 0.3 - result.bestEngineParams shouldBe ep1 - } - - test("Evaluation return best engine params, complex result type") { - val engine = new Engine1() - val ep0 = EngineParams(dataSourceParams = Engine1.DSP(0.2)) - val ep1 = EngineParams(dataSourceParams = Engine1.DSP(0.3)) - val ep2 = EngineParams(dataSourceParams = Engine1.DSP(0.3)) - val ep3 = EngineParams(dataSourceParams = Engine1.DSP(-0.2)) - val engineParamsList = Seq(ep0, ep1, ep2, ep3) - - val evaluator = MetricEvaluator(new Metric1()) - - object Eval extends Evaluation { - engineEvaluator = (new Engine1(), MetricEvaluator(new Metric1())) - } - - val result = EvaluationWorkflow.runEvaluation( - sc, - Eval, - engine, - engineParamsList, - evaluator, - WorkflowParams()) - - result.bestScore.score shouldBe Metric1.Result(0, 0.3) - result.bestEngineParams shouldBe ep1 - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/workflow/JsonExtractorSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/prediction/workflow/JsonExtractorSuite.scala b/core/src/test/scala/io/prediction/workflow/JsonExtractorSuite.scala deleted file mode 100644 index 34ff751..0000000 --- a/core/src/test/scala/io/prediction/workflow/JsonExtractorSuite.scala +++ /dev/null @@ -1,383 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.workflow - -import io.prediction.controller.EngineParams -import io.prediction.controller.Params -import io.prediction.controller.Utils -import org.json4s.CustomSerializer -import org.json4s.JsonAST.JField -import org.json4s.JsonAST.JObject -import org.json4s.JsonAST.JString -import org.json4s.MappingException -import org.json4s.native.JsonMethods.compact -import org.json4s.native.JsonMethods.render -import org.scalatest.FunSuite -import org.scalatest.Matchers - -class JsonExtractorSuite extends FunSuite with Matchers { - - test("Extract Scala object using option Json4sNative works with optional and default value " + - "provided") { - - val json = """{"string": "query string", "optional": "optional string", "default": "d"}""" - - val query = JsonExtractor.extract( - JsonExtractorOption.Json4sNative, - json, - classOf[ScalaQuery]) - - query should be (ScalaQuery("query string", Some("optional string"), "d")) - } - - test("Extract Scala object using option Json4sNative works with no optional and no default " + - "value provided") { - - val json = """{"string": "query string"}""" - - val query = JsonExtractor.extract( - JsonExtractorOption.Json4sNative, - json, - classOf[ScalaQuery]) - - query should be (ScalaQuery("query string", None, "default")) - } - - test("Extract Scala object using option Json4sNative works with null optional and null default" + - " value") { - - val json = """{"string": "query string", "optional": null, "default": null}""" - - val query = JsonExtractor.extract( - JsonExtractorOption.Json4sNative, - json, - classOf[ScalaQuery]) - - query should be (ScalaQuery("query string", None, "default")) - } - - test("Extract Scala object using option Both works with optional and default value provided") { - - val json = """{"string": "query string", "optional": "optional string", "default": "d"}""" - - val query = JsonExtractor.extract( - JsonExtractorOption.Json4sNative, - json, - classOf[ScalaQuery]) - - query should be (ScalaQuery("query string", Some("optional string"), "d")) - } - - test("Extract Scala object using option Both works with no optional and no default value " + - "provided") { - - val json = """{"string": "query string"}""" - - val query = JsonExtractor.extract( - JsonExtractorOption.Json4sNative, - json, - classOf[ScalaQuery]) - - query should be (ScalaQuery("query string", None, "default")) - } - - test("Extract Scala object using option Both works with null optional and null default value") { - - val json = """{"string": "query string", "optional": null, "default": null}""" - - val query = JsonExtractor.extract( - JsonExtractorOption.Json4sNative, - json, - classOf[ScalaQuery]) - - query should be (ScalaQuery("query string", None, "default")) - } - - test("Extract Scala object using option Gson should not get default value and optional none" + - " value") { - - val json = """{"string": "query string"}""" - val query = JsonExtractor.extract( - JsonExtractorOption.Gson, - json, - classOf[ScalaQuery]) - - query should be (ScalaQuery("query string", null, null)) - } - - test("Extract Scala object using option Gson should throw an exception with optional " + - "value provided") { - - val json = """{"string": "query string", "optional": "o", "default": "d"}""" - intercept[RuntimeException] { - JsonExtractor.extract( - JsonExtractorOption.Gson, - json, - classOf[ScalaQuery]) - } - } - - test("Extract Java object using option Gson works") { - - val json = """{"q": "query string"}""" - - val query = JsonExtractor.extract( - JsonExtractorOption.Gson, - json, - classOf[JavaQuery]) - - query should be (new JavaQuery("query string")) - } - - test("Extract Java object using option Both works") { - - val json = """{"q": "query string"}""" - - val query = JsonExtractor.extract( - JsonExtractorOption.Both, - json, - classOf[JavaQuery]) - - query should be (new JavaQuery("query string")) - } - - test("Extract Java object using option Json4sNative should throw an exception") { - - val json = """{"q": "query string"}""" - - intercept[MappingException] { - JsonExtractor.extract( - JsonExtractorOption.Json4sNative, - json, - classOf[JavaQuery]) - } - } - - test("Extract Scala object using option Json4sNative with custom deserializer") { - val json = """{"string": "query string", "optional": "o", "default": "d"}""" - - val query = JsonExtractor.extract( - JsonExtractorOption.Json4sNative, - json, - classOf[ScalaQuery], - Utils.json4sDefaultFormats + new UpperCaseFormat - ) - - query should be(ScalaQuery("QUERY STRING", Some("O"), "D")) - } - - test("Extract Java object usingoption Gson with custom deserializer") { - val json = """{"q": "query string"}""" - - val query = JsonExtractor.extract( - extractorOption = JsonExtractorOption.Gson, - json = json, - clazz = classOf[JavaQuery], - gsonTypeAdapterFactories = Seq(new JavaQueryTypeAdapterFactory) - ) - - query should be(new JavaQuery("QUERY STRING")) - } - - test("Java object to JValue using option Both works") { - val query = new JavaQuery("query string") - val jValue = JsonExtractor.toJValue(JsonExtractorOption.Both, query) - - compact(render(jValue)) should be ("""{"q":"query string"}""") - } - - test("Java object to JValue using option Gson works") { - val query = new JavaQuery("query string") - val jValue = JsonExtractor.toJValue(JsonExtractorOption.Gson, query) - - compact(render(jValue)) should be ("""{"q":"query string"}""") - } - - test("Java object to JValue using option Json4sNative results in empty Json") { - val query = new JavaQuery("query string") - val jValue = JsonExtractor.toJValue(JsonExtractorOption.Json4sNative, query) - - compact(render(jValue)) should be ("""{}""") - } - - test("Scala object to JValue using option Both works") { - val query = new ScalaQuery("query string", Some("option")) - val jValue = JsonExtractor.toJValue(JsonExtractorOption.Both, query) - - compact(render(jValue)) should - be ("""{"string":"query string","optional":"option","default":"default"}""") - } - - test("Scala object to JValue using option Gson does not serialize optional") { - val query = new ScalaQuery("query string", Some("option")) - val jValue = JsonExtractor.toJValue(JsonExtractorOption.Gson, query) - - compact(render(jValue)) should - be ("""{"string":"query string","optional":{},"default":"default"}""") - } - - test("Scala object to JValue using option Json4sNative works") { - val query = new ScalaQuery("query string", Some("option")) - val jValue = JsonExtractor.toJValue(JsonExtractorOption.Json4sNative, query) - - compact(render(jValue)) should - be ("""{"string":"query string","optional":"option","default":"default"}""") - } - - test("Scala object to JValue using option Json4sNative with custom serializer") { - val query = new ScalaQuery("query string", Some("option")) - val jValue = JsonExtractor.toJValue( - JsonExtractorOption.Json4sNative, - query, - Utils.json4sDefaultFormats + new UpperCaseFormat - ) - - compact(render(jValue)) should - be ("""{"string":"QUERY STRING","optional":"OPTION","default":"DEFAULT"}""") - } - - test("Java object to JValue using option Gson with custom serializer") { - val query = new JavaQuery("query string") - val jValue = JsonExtractor.toJValue( - extractorOption = JsonExtractorOption.Gson, - o = query, - gsonTypeAdapterFactories = Seq(new JavaQueryTypeAdapterFactory) - ) - - compact(render(jValue)) should be ("""{"q":"QUERY STRING"}""") - } - - test("Java Param to Json using option Both") { - val param = ("algo", new JavaParams("parameter")) - val json = JsonExtractor.paramToJson(JsonExtractorOption.Both, param) - - json should be ("""{"algo":{"p":"parameter"}}""") - } - - test("Java Param to Json using option Gson") { - val param = ("algo", new JavaParams("parameter")) - val json = JsonExtractor.paramToJson(JsonExtractorOption.Gson, param) - - json should be ("""{"algo":{"p":"parameter"}}""") - } - - test("Scala Param to Json using option Both") { - val param = ("algo", AlgorithmParams("parameter")) - val json = JsonExtractor.paramToJson(JsonExtractorOption.Both, param) - - json should be ("""{"algo":{"a":"parameter"}}""") - } - - test("Scala Param to Json using option Json4sNative") { - val param = ("algo", AlgorithmParams("parameter")) - val json = JsonExtractor.paramToJson(JsonExtractorOption.Json4sNative, param) - - json should be ("""{"algo":{"a":"parameter"}}""") - } - - test("Java Params to Json using option Both") { - val params = Seq(("algo", new JavaParams("parameter")), ("algo2", new JavaParams("parameter2"))) - val json = JsonExtractor.paramsToJson(JsonExtractorOption.Both, params) - - json should be ("""[{"algo":{"p":"parameter"}},{"algo2":{"p":"parameter2"}}]""") - } - - test("Java Params to Json using option Gson") { - val params = Seq(("algo", new JavaParams("parameter")), ("algo2", new JavaParams("parameter2"))) - val json = JsonExtractor.paramsToJson(JsonExtractorOption.Gson, params) - - json should be ("""[{"algo":{"p":"parameter"}},{"algo2":{"p":"parameter2"}}]""") - } - - test("Scala Params to Json using option Both") { - val params = - Seq(("algo", AlgorithmParams("parameter")), ("algo2", AlgorithmParams("parameter2"))) - val json = JsonExtractor.paramsToJson(JsonExtractorOption.Both, params) - - json should be (org.json4s.native.Serialization.write(params)(Utils.json4sDefaultFormats)) - } - - test("Scala Params to Json using option Json4sNative") { - val params = - Seq(("algo", AlgorithmParams("parameter")), ("algo2", AlgorithmParams("parameter2"))) - val json = JsonExtractor.paramsToJson(JsonExtractorOption.Json4sNative, params) - - json should be (org.json4s.native.Serialization.write(params)(Utils.json4sDefaultFormats)) - } - - test("Mixed Java and Scala Params to Json using option Both") { - val params = - Seq(("scala", AlgorithmParams("parameter")), ("java", new JavaParams("parameter2"))) - val json = JsonExtractor.paramsToJson(JsonExtractorOption.Both, params) - - json should be ("""[{"scala":{"a":"parameter"}},{"java":{"p":"parameter2"}}]""") - } - - test("Serializing Scala EngineParams works using option Json4sNative") { - val ep = new EngineParams( - dataSourceParams = ("ds", DataSourceParams("dsp")), - algorithmParamsList = Seq(("a0", AlgorithmParams("ap")))) - - val json = JsonExtractor.engineParamsToJson(JsonExtractorOption.Json4sNative, ep) - - json should be ( - """{"dataSourceParams":{"ds":{"a":"dsp"}},"preparatorParams":{"":{}},""" + - """"algorithmParamsList":[{"a0":{"a":"ap"}}],"servingParams":{"":{}}}""") - } - - test("Serializing Java EngineParams works using option Gson") { - val ep = new EngineParams( - dataSourceParams = ("ds", new JavaParams("dsp")), - algorithmParamsList = Seq(("a0", new JavaParams("ap")), ("a1", new JavaParams("ap2")))) - - val json = JsonExtractor.engineParamsToJson(JsonExtractorOption.Gson, ep) - - json should be ( - """{"dataSourceParams":{"ds":{"p":"dsp"}},"preparatorParams":{"":{}},""" + - """"algorithmParamsList":[{"a0":{"p":"ap"}},{"a1":{"p":"ap2"}}],"servingParams":{"":{}}}""") - } - - test("Serializing Java EngineParams works using option Both") { - val ep = new EngineParams( - dataSourceParams = ("ds", new JavaParams("dsp")), - algorithmParamsList = Seq(("a0", new JavaParams("ap")), ("a1", new JavaParams("ap2")))) - - val json = JsonExtractor.engineParamsToJson(JsonExtractorOption.Both, ep) - - json should be ( - """{"dataSourceParams":{"ds":{"p":"dsp"}},"preparatorParams":{"":{}},""" + - """"algorithmParamsList":[{"a0":{"p":"ap"}},{"a1":{"p":"ap2"}}],"servingParams":{"":{}}}""") - } -} - -private case class AlgorithmParams(a: String) extends Params - -private case class DataSourceParams(a: String) extends Params - -private case class ScalaQuery(string: String, optional: Option[String], default: String = "default") - -private class UpperCaseFormat extends CustomSerializer[ScalaQuery](format => ( { - case JObject(JField("string", JString(string)) :: - JField("optional", JString(optional)) :: - JField("default", JString(default)) :: - Nil) => ScalaQuery(string.toUpperCase, Some(optional.toUpperCase), default.toUpperCase) -}, { - case x: ScalaQuery => - JObject( - JField("string", JString(x.string.toUpperCase)), - JField("optional", JString(x.optional.get.toUpperCase)), - JField("default", JString(x.default.toUpperCase))) -})) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/controller/EngineTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/predictionio/controller/EngineTest.scala b/core/src/test/scala/org/apache/predictionio/controller/EngineTest.scala new file mode 100644 index 0000000..eebe0af --- /dev/null +++ b/core/src/test/scala/org/apache/predictionio/controller/EngineTest.scala @@ -0,0 +1,615 @@ +package org.apache.predictionio.controller + +import org.apache.predictionio.workflow.PersistentModelManifest +import org.apache.predictionio.workflow.SharedSparkContext +import org.apache.predictionio.workflow.StopAfterPrepareInterruption +import org.apache.predictionio.workflow.StopAfterReadInterruption + +import grizzled.slf4j.Logger +import org.apache.predictionio.workflow.WorkflowParams +import org.apache.spark.rdd.RDD +import org.scalatest.Inspectors._ +import org.scalatest.Matchers._ +import org.scalatest.FunSuite +import org.scalatest.Inside + +import scala.util.Random + +class EngineSuite +extends FunSuite with Inside with SharedSparkContext { + import org.apache.predictionio.controller.Engine0._ + @transient lazy val logger = Logger[this.type] + + test("Engine.train") { + val engine = new Engine( + classOf[PDataSource2], + classOf[PPreparator1], + Map("" -> classOf[PAlgo2]), + classOf[LServing1]) + + val engineParams = EngineParams( + dataSourceParams = PDataSource2.Params(0), + preparatorParams = PPreparator1.Params(1), + algorithmParamsList = Seq(("", PAlgo2.Params(2))), + servingParams = LServing1.Params(3)) + + val models = engine.train( + sc, + engineParams, + engineInstanceId = "", + params = WorkflowParams()) + + val pd = ProcessedData(1, TrainingData(0)) + + // PAlgo2.Model doesn't have IPersistentModel trait implemented. Hence the + // model extract after train is Unit. + models should contain theSameElementsAs Seq(Unit) + } + + test("Engine.train persisting PAlgo.Model") { + val engine = new Engine( + classOf[PDataSource2], + classOf[PPreparator1], + Map( + "PAlgo2" -> classOf[PAlgo2], + "PAlgo3" -> classOf[PAlgo3] + ), + classOf[LServing1]) + + val engineParams = EngineParams( + dataSourceParams = PDataSource2.Params(0), + preparatorParams = PPreparator1.Params(1), + algorithmParamsList = Seq( + ("PAlgo2", PAlgo2.Params(2)), + ("PAlgo3", PAlgo3.Params(21)), + ("PAlgo3", PAlgo3.Params(22)) + ), + servingParams = LServing1.Params(3)) + + val pd = ProcessedData(1, TrainingData(0)) + val model21 = PAlgo3.Model(21, pd) + val model22 = PAlgo3.Model(22, pd) + + val models = engine.train( + sc, + engineParams, + engineInstanceId = "", + params = WorkflowParams()) + + val pModel21 = PersistentModelManifest(model21.getClass.getName) + val pModel22 = PersistentModelManifest(model22.getClass.getName) + + models should contain theSameElementsAs Seq(Unit, pModel21, pModel22) + } + + test("Engine.train persisting LAlgo.Model") { + val engine = Engine( + classOf[LDataSource1], + classOf[LPreparator1], + Map( + "LAlgo1" -> classOf[LAlgo1], + "LAlgo2" -> classOf[LAlgo2], + "LAlgo3" -> classOf[LAlgo3] + ), + classOf[LServing1]) + + val engineParams = EngineParams( + dataSourceParams = LDataSource1.Params(0), + preparatorParams = LPreparator1.Params(1), + algorithmParamsList = Seq( + ("LAlgo2", LAlgo2.Params(20)), + ("LAlgo2", LAlgo2.Params(21)), + ("LAlgo3", LAlgo3.Params(22))), + servingParams = LServing1.Params(3)) + + val pd = ProcessedData(1, TrainingData(0)) + val model20 = LAlgo2.Model(20, pd) + val model21 = LAlgo2.Model(21, pd) + val model22 = LAlgo3.Model(22, pd) + + //val models = engine.train(sc, engineParams, WorkflowParams()) + val models = engine.train( + sc, + engineParams, + engineInstanceId = "", + params = WorkflowParams()) + + val pModel20 = PersistentModelManifest(model20.getClass.getName) + val pModel21 = PersistentModelManifest(model21.getClass.getName) + + models should contain theSameElementsAs Seq(pModel20, pModel21, model22) + } + + test("Engine.train persisting P&NAlgo.Model") { + val engine = new Engine( + classOf[PDataSource2], + classOf[PPreparator1], + Map( + "PAlgo2" -> classOf[PAlgo2], + "PAlgo3" -> classOf[PAlgo3], + "NAlgo2" -> classOf[NAlgo2], + "NAlgo3" -> classOf[NAlgo3] + ), + classOf[LServing1]) + + val engineParams = EngineParams( + dataSourceParams = PDataSource2.Params(0), + preparatorParams = PPreparator1.Params(1), + algorithmParamsList = Seq( + ("PAlgo2", PAlgo2.Params(20)), + ("PAlgo3", PAlgo3.Params(21)), + ("PAlgo3", PAlgo3.Params(22)), + ("NAlgo2", NAlgo2.Params(23)), + ("NAlgo3", NAlgo3.Params(24)), + ("NAlgo3", NAlgo3.Params(25)) + ), + servingParams = LServing1.Params(3)) + + val pd = ProcessedData(1, TrainingData(0)) + val model21 = PAlgo3.Model(21, pd) + val model22 = PAlgo3.Model(22, pd) + val model23 = NAlgo2.Model(23, pd) + val model24 = NAlgo3.Model(24, pd) + val model25 = NAlgo3.Model(25, pd) + + //val models = engine.train(sc, engineParams, WorkflowParams()) + val models = engine.train( + sc, + engineParams, + engineInstanceId = "", + params = WorkflowParams()) + + val pModel21 = PersistentModelManifest(model21.getClass.getName) + val pModel22 = PersistentModelManifest(model22.getClass.getName) + val pModel23 = PersistentModelManifest(model23.getClass.getName) + + models should contain theSameElementsAs Seq( + Unit, pModel21, pModel22, pModel23, model24, model25) + } + + test("Engine.eval") { + val engine = new Engine( + classOf[PDataSource2], + classOf[PPreparator1], + Map("" -> classOf[PAlgo2]), + classOf[LServing1]) + + val qn = 10 + val en = 3 + + val engineParams = EngineParams( + dataSourceParams = PDataSource2.Params(id = 0, en = en, qn = qn), + preparatorParams = PPreparator1.Params(1), + algorithmParamsList = Seq(("", PAlgo2.Params(2))), + servingParams = LServing1.Params(3)) + + val algoCount = engineParams.algorithmParamsList.size + val pd = ProcessedData(1, TrainingData(0)) + val model0 = PAlgo2.Model(2, pd) + + val evalDataSet = engine.eval(sc, engineParams, WorkflowParams()) + + evalDataSet should have size en + + forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => { + val (evalInfo, qpaRDD) = evalData + evalInfo shouldBe EvalInfo(0) + + val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect + + qpaSeq should have size qn + + forAll (qpaSeq) { case (q, p, a) => + val Query(qId, qEx, qQx, _) = q + val Actual(aId, aEx, aQx) = a + qId shouldBe aId + qEx shouldBe ex + aEx shouldBe ex + qQx shouldBe aQx + + inside (p) { case Prediction(pId, pQ, pModels, pPs) => { + pId shouldBe 3 + pQ shouldBe q + pModels shouldBe None + pPs should have size algoCount + pPs shouldBe Seq( + Prediction(id = 2, q = q, models = Some(model0))) + }} + } + }} + } + + test("Engine.prepareDeploy PAlgo") { + val engine = new Engine( + classOf[PDataSource2], + classOf[PPreparator1], + Map( + "PAlgo2" -> classOf[PAlgo2], + "PAlgo3" -> classOf[PAlgo3], + "NAlgo2" -> classOf[NAlgo2], + "NAlgo3" -> classOf[NAlgo3] + ), + classOf[LServing1]) + + val engineParams = EngineParams( + dataSourceParams = PDataSource2.Params(0), + preparatorParams = PPreparator1.Params(1), + algorithmParamsList = Seq( + ("PAlgo2", PAlgo2.Params(20)), + ("PAlgo3", PAlgo3.Params(21)), + ("PAlgo3", PAlgo3.Params(22)), + ("NAlgo2", NAlgo2.Params(23)), + ("NAlgo3", NAlgo3.Params(24)), + ("NAlgo3", NAlgo3.Params(25)) + ), + servingParams = LServing1.Params(3)) + + val pd = ProcessedData(1, TrainingData(0)) + val model20 = PAlgo2.Model(20, pd) + val model21 = PAlgo3.Model(21, pd) + val model22 = PAlgo3.Model(22, pd) + val model23 = NAlgo2.Model(23, pd) + val model24 = NAlgo3.Model(24, pd) + val model25 = NAlgo3.Model(25, pd) + + val rand = new Random() + + val fakeEngineInstanceId = s"FakeInstanceId-${rand.nextLong()}" + + val persistedModels = engine.train( + sc, + engineParams, + engineInstanceId = fakeEngineInstanceId, + params = WorkflowParams() + ) + + val deployableModels = engine.prepareDeploy( + sc, + engineParams, + fakeEngineInstanceId, + persistedModels, + params = WorkflowParams() + ) + + deployableModels should contain theSameElementsAs Seq( + model20, model21, model22, model23, model24, model25) + } +} + +class EngineTrainSuite extends FunSuite with SharedSparkContext { + import org.apache.predictionio.controller.Engine0._ + val defaultWorkflowParams: WorkflowParams = WorkflowParams() + + test("Parallel DS/P/Algos") { + val models = Engine.train( + sc, + new PDataSource0(0), + new PPreparator0(1), + Seq( + new PAlgo0(2), + new PAlgo1(3), + new PAlgo0(4)), + defaultWorkflowParams + ) + + val pd = ProcessedData(1, TrainingData(0)) + + models should contain theSameElementsAs Seq( + PAlgo0.Model(2, pd), PAlgo1.Model(3, pd), PAlgo0.Model(4, pd)) + } + + test("Local DS/P/Algos") { + val models = Engine.train( + sc, + new LDataSource0(0), + new LPreparator0(1), + Seq( + new LAlgo0(2), + new LAlgo1(3), + new LAlgo0(4)), + defaultWorkflowParams + ) + + val pd = ProcessedData(1, TrainingData(0)) + + val expectedResults = Seq( + LAlgo0.Model(2, pd), + LAlgo1.Model(3, pd), + LAlgo0.Model(4, pd)) + + forAll(models.zip(expectedResults)) { case (model, expected) => + model shouldBe a [RDD[_]] + val localModel = model.asInstanceOf[RDD[_]].collect + localModel should contain theSameElementsAs Seq(expected) + } + } + + test("P2L DS/P/Algos") { + val models = Engine.train( + sc, + new PDataSource0(0), + new PPreparator0(1), + Seq( + new NAlgo0(2), + new NAlgo1(3), + new NAlgo0(4)), + defaultWorkflowParams + ) + + val pd = ProcessedData(1, TrainingData(0)) + + models should contain theSameElementsAs Seq( + NAlgo0.Model(2, pd), NAlgo1.Model(3, pd), NAlgo0.Model(4, pd)) + } + + test("Parallel DS/P/Algos Stop-After-Read") { + val workflowParams = defaultWorkflowParams.copy( + stopAfterRead = true) + + an [StopAfterReadInterruption] should be thrownBy Engine.train( + sc, + new PDataSource0(0), + new PPreparator0(1), + Seq( + new PAlgo0(2), + new PAlgo1(3), + new PAlgo0(4)), + workflowParams + ) + } + + test("Parallel DS/P/Algos Stop-After-Prepare") { + val workflowParams = defaultWorkflowParams.copy( + stopAfterPrepare = true) + + an [StopAfterPrepareInterruption] should be thrownBy Engine.train( + sc, + new PDataSource0(0), + new PPreparator0(1), + Seq( + new PAlgo0(2), + new PAlgo1(3), + new PAlgo0(4)), + workflowParams + ) + } + + test("Parallel DS/P/Algos Dirty TrainingData") { + val workflowParams = defaultWorkflowParams.copy( + skipSanityCheck = false) + + an [AssertionError] should be thrownBy Engine.train( + sc, + new PDataSource3(0, error = true), + new PPreparator0(1), + Seq( + new PAlgo0(2), + new PAlgo1(3), + new PAlgo0(4)), + workflowParams + ) + } + + test("Parallel DS/P/Algos Dirty TrainingData But Skip Check") { + val workflowParams = defaultWorkflowParams.copy( + skipSanityCheck = true) + + val models = Engine.train( + sc, + new PDataSource3(0, error = true), + new PPreparator0(1), + Seq( + new PAlgo0(2), + new PAlgo1(3), + new PAlgo0(4)), + workflowParams + ) + + val pd = ProcessedData(1, TrainingData(0, error = true)) + + models should contain theSameElementsAs Seq( + PAlgo0.Model(2, pd), PAlgo1.Model(3, pd), PAlgo0.Model(4, pd)) + } +} + + +class EngineEvalSuite +extends FunSuite with Inside with SharedSparkContext { + import org.apache.predictionio.controller.Engine0._ + + @transient lazy val logger = Logger[this.type] + + test("Simple Parallel DS/P/A/S") { + val en = 2 + val qn = 5 + + val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = + Engine.eval( + sc, + new PDataSource1(id = 1, en = en, qn = qn), + new PPreparator0(id = 2), + Seq(new PAlgo0(id = 3)), + new LServing0(id = 10)) + + val pd = ProcessedData(2, TrainingData(1)) + val model0 = PAlgo0.Model(3, pd) + + forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => { + val (evalInfo, qpaRDD) = evalData + evalInfo shouldBe EvalInfo(1) + + val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect + forAll (qpaSeq) { case (q, p, a) => + val Query(qId, qEx, qQx, _) = q + val Actual(aId, aEx, aQx) = a + qId shouldBe aId + qEx shouldBe ex + aEx shouldBe ex + qQx shouldBe aQx + + inside (p) { case Prediction(pId, pQ, pModels, pPs) => { + pId shouldBe 10 + pQ shouldBe q + pModels shouldBe None + pPs should have size 1 + pPs shouldBe Seq( + Prediction(id = 3, q = q, models = Some(model0))) + }} + } + + }} + + } + + test("Parallel DS/P/A/S") { + val en = 2 + val qn = 5 + + val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = + Engine.eval( + sc, + new PDataSource1(id = 1, en = en, qn = qn), + new PPreparator0(id = 2), + Seq( + new PAlgo0(id = 3), + new PAlgo1(id = 4), + new NAlgo1(id = 5)), + new LServing0(id = 10)) + + val pd = ProcessedData(2, TrainingData(1)) + val model0 = PAlgo0.Model(3, pd) + val model1 = PAlgo1.Model(4, pd) + val model2 = NAlgo1.Model(5, pd) + + forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => { + val (evalInfo, qpaRDD) = evalData + evalInfo shouldBe EvalInfo(1) + + val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect + forAll (qpaSeq) { case (q, p, a) => + val Query(qId, qEx, qQx, _) = q + val Actual(aId, aEx, aQx) = a + qId shouldBe aId + qEx shouldBe ex + aEx shouldBe ex + qQx shouldBe aQx + + inside (p) { case Prediction(pId, pQ, pModels, pPs) => { + pId shouldBe 10 + pQ shouldBe q + pModels shouldBe None + pPs should have size 3 + pPs shouldBe Seq( + Prediction(id = 3, q = q, models = Some(model0)), + Prediction(id = 4, q = q, models = Some(model1)), + Prediction(id = 5, q = q, models = Some(model2)) + ) + }} + } + }} + } + + test("Parallel DS/P/A/S with Supplemented Query") { + val en = 2 + val qn = 5 + + val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = + Engine.eval( + sc, + new PDataSource1(id = 1, en = en, qn = qn), + new PPreparator0(id = 2), + Seq( + new PAlgo0(id = 3), + new PAlgo1(id = 4), + new NAlgo1(id = 5)), + new LServing2(id = 10)) + + val pd = ProcessedData(2, TrainingData(1)) + val model0 = PAlgo0.Model(3, pd) + val model1 = PAlgo1.Model(4, pd) + val model2 = NAlgo1.Model(5, pd) + + forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => { + val (evalInfo, qpaRDD) = evalData + evalInfo shouldBe EvalInfo(1) + + val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect + forAll (qpaSeq) { case (q, p, a) => + val Query(qId, qEx, qQx, qSupp) = q + val Actual(aId, aEx, aQx) = a + qId shouldBe aId + qEx shouldBe ex + aEx shouldBe ex + qQx shouldBe aQx + qSupp shouldBe false + + inside (p) { case Prediction(pId, pQ, pModels, pPs) => { + pId shouldBe 10 + pQ shouldBe q + pModels shouldBe None + pPs should have size 3 + // queries inside prediction should have supp set to true, since it + // represents what the algorithms see. + val qSupp = q.copy(supp = true) + pPs shouldBe Seq( + Prediction(id = 3, q = qSupp, models = Some(model0)), + Prediction(id = 4, q = qSupp, models = Some(model1)), + Prediction(id = 5, q = qSupp, models = Some(model2)) + ) + }} + } + }} + } + + test("Local DS/P/A/S") { + val en = 2 + val qn = 5 + + val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = + Engine.eval( + sc, + new LDataSource0(id = 1, en = en, qn = qn), + new LPreparator0(id = 2), + Seq( + new LAlgo0(id = 3), + new LAlgo1(id = 4), + new LAlgo1(id = 5)), + new LServing0(id = 10)) + + val pd = ProcessedData(2, TrainingData(1)) + val model0 = LAlgo0.Model(3, pd) + val model1 = LAlgo1.Model(4, pd) + val model2 = LAlgo1.Model(5, pd) + + forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => { + val (evalInfo, qpaRDD) = evalData + evalInfo shouldBe EvalInfo(1) + + val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect + forAll (qpaSeq) { case (q, p, a) => + val Query(qId, qEx, qQx, _) = q + val Actual(aId, aEx, aQx) = a + qId shouldBe aId + qEx shouldBe ex + aEx shouldBe ex + qQx shouldBe aQx + + inside (p) { case Prediction(pId, pQ, pModels, pPs) => { + pId shouldBe 10 + pQ shouldBe q + pModels shouldBe None + pPs should have size 3 + pPs shouldBe Seq( + Prediction(id = 3, q = q, models = Some(model0)), + Prediction(id = 4, q = q, models = Some(model1)), + Prediction(id = 5, q = q, models = Some(model2)) + ) + }} + } + + }} + + } +} + + http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/controller/EvaluationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/predictionio/controller/EvaluationTest.scala b/core/src/test/scala/org/apache/predictionio/controller/EvaluationTest.scala new file mode 100644 index 0000000..86fe68c --- /dev/null +++ b/core/src/test/scala/org/apache/predictionio/controller/EvaluationTest.scala @@ -0,0 +1,46 @@ +package org.apache.predictionio.controller + +import org.apache.predictionio.workflow.SharedSparkContext + +import org.scalatest.FunSuite +import org.scalatest.Inside +import org.scalatest.Matchers._ + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD + +object EvaluationSuite { + import org.apache.predictionio.controller.TestEvaluator._ + + class Metric0 extends Metric[EvalInfo, Query, Prediction, Actual, Int] { + def calculate( + sc: SparkContext, + evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])]): Int = 1 + } + + object Evaluation0 extends Evaluation { + engineMetric = (new FakeEngine(1, 1, 1), new Metric0()) + } +} + + +class EvaluationSuite +extends FunSuite with Inside with SharedSparkContext { + import org.apache.predictionio.controller.EvaluationSuite._ + + test("Evaluation makes MetricEvaluator") { + // MetricEvaluator is typed [EvalInfo, Query, Prediction, Actual, Int], + // however this information is erased on JVM. scalatest doc recommends to + // use wildcards. + Evaluation0.evaluator shouldBe a [MetricEvaluator[_, _, _, _, _]] + } + + test("Load from class path") { + val r = org.apache.predictionio.workflow.WorkflowUtils.getEvaluation( + "org.apache.predictionio.controller.EvaluationSuite.Evaluation0", + getClass.getClassLoader) + + r._2 shouldBe EvaluationSuite.Evaluation0 + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/controller/EvaluatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/predictionio/controller/EvaluatorTest.scala b/core/src/test/scala/org/apache/predictionio/controller/EvaluatorTest.scala new file mode 100644 index 0000000..c2668ac --- /dev/null +++ b/core/src/test/scala/org/apache/predictionio/controller/EvaluatorTest.scala @@ -0,0 +1,93 @@ +package org.apache.predictionio.controller + +import org.apache.predictionio.core._ +import org.apache.predictionio.workflow.WorkflowParams + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD + +object TestEvaluator { + case class EvalInfo(id: Int, ex: Int) + case class Query(id: Int, ex: Int, qx: Int) + case class Prediction(id: Int, ex: Int, qx: Int) + case class Actual(id: Int, ex: Int, qx: Int) + + class FakeEngine(val id: Int, val en: Int, val qn: Int) + extends BaseEngine[EvalInfo, Query, Prediction, Actual] { + def train( + sc: SparkContext, + engineParams: EngineParams, + instanceId: String = "", + params: WorkflowParams = WorkflowParams() + ): Seq[Any] = { + Seq[Any]() + } + + def eval( + sc: SparkContext, + engineParams: EngineParams, + params: WorkflowParams) + : Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = { + (0 until en).map { ex => { + val qpas = (0 until qn).map { qx => { + (Query(id, ex, qx), Prediction(id, ex, qx), Actual(id, ex, qx)) + }} + + (EvalInfo(id = id, ex = ex), sc.parallelize(qpas)) + }} + } + + } + + /* + class Evaluator0 extends Evaluator[EvalInfo, Query, Prediction, Actual, + (Query, Prediction, Actual), + (EvalInfo, Seq[(Query, Prediction, Actual)]), + Seq[(EvalInfo, (EvalInfo, Seq[(Query, Prediction, Actual)]))] + ] { + + def evaluateUnit(q: Query, p: Prediction, a: Actual) + : (Query, Prediction, Actual) = (q, p, a) + + def evaluateSet( + evalInfo: EvalInfo, + eus: Seq[(Query, Prediction, Actual)]) + : (EvalInfo, Seq[(Query, Prediction, Actual)]) = (evalInfo, eus) + + def evaluateAll( + input: Seq[(EvalInfo, (EvalInfo, Seq[(Query, Prediction, Actual)]))]) + = input + } + */ + +} + +/* +class EvaluatorSuite +extends FunSuite with Inside with SharedSparkContext { + import org.apache.predictionio.controller.TestEvaluator._ + @transient lazy val logger = Logger[this.type] + + test("Evaluator.evaluate") { + val engine = new FakeEngine(1, 3, 10) + val evaluator = new Evaluator0() + + val evalDataSet = engine.eval(sc, null.asInstanceOf[EngineParams]) + val er: Seq[(EvalInfo, (EvalInfo, Seq[(Query, Prediction, Actual)]))] = + evaluator.evaluateBase(sc, evalDataSet) + + evalDataSet.zip(er).map { case (input, output) => { + val (inputEvalInfo, inputQpaRDD) = input + val (outputEvalInfo, (outputEvalInfo2, outputQpaSeq)) = output + + inputEvalInfo shouldBe outputEvalInfo + inputEvalInfo shouldBe outputEvalInfo2 + + val inputQpaSeq: Array[(Query, Prediction, Actual)] = inputQpaRDD.collect + + inputQpaSeq.size should be (outputQpaSeq.size) + // TODO. match inputQpa and outputQpa content. + }} + } +} +*/ http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/controller/FastEvalEngineTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/predictionio/controller/FastEvalEngineTest.scala b/core/src/test/scala/org/apache/predictionio/controller/FastEvalEngineTest.scala new file mode 100644 index 0000000..a4dc42f --- /dev/null +++ b/core/src/test/scala/org/apache/predictionio/controller/FastEvalEngineTest.scala @@ -0,0 +1,181 @@ +package org.apache.predictionio.controller + +import org.apache.predictionio.workflow.WorkflowParams +import org.scalatest.FunSuite +import org.scalatest.Inside +import org.scalatest.Matchers._ +import org.scalatest.Inspectors._ + +import org.apache.predictionio.workflow.SharedSparkContext + +class FastEngineSuite +extends FunSuite with Inside with SharedSparkContext { + import org.apache.predictionio.controller.Engine0._ + + test("Single Evaluation") { + val engine = new FastEvalEngine( + Map("" -> classOf[PDataSource2]), + Map("" -> classOf[PPreparator1]), + Map( + "PAlgo2" -> classOf[PAlgo2], + "PAlgo3" -> classOf[PAlgo3] + ), + Map("" -> classOf[LServing1])) + + val qn = 10 + val en = 3 + + val engineParams = EngineParams( + dataSourceParams = PDataSource2.Params(id = 0, en = en, qn = qn), + preparatorParams = PPreparator1.Params(1), + algorithmParamsList = Seq( + ("PAlgo2", PAlgo2.Params(20)), + ("PAlgo2", PAlgo2.Params(21)), + ("PAlgo3", PAlgo3.Params(22)) + ), + servingParams = LServing1.Params(3)) + + val algoCount = engineParams.algorithmParamsList.size + val pd = ProcessedData(1, TrainingData(0)) + val model0 = PAlgo2.Model(20, pd) + val model1 = PAlgo2.Model(21, pd) + val model2 = PAlgo3.Model(22, pd) + + val evalDataSet = engine.eval(sc, engineParams, WorkflowParams()) + + evalDataSet should have size en + + forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => { + val (evalInfo, qpaRDD) = evalData + evalInfo shouldBe EvalInfo(0) + + val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect + + qpaSeq should have size qn + + forAll (qpaSeq) { case (q, p, a) => + val Query(qId, qEx, qQx, _) = q + val Actual(aId, aEx, aQx) = a + qId shouldBe aId + qEx shouldBe ex + aEx shouldBe ex + qQx shouldBe aQx + + inside (p) { case Prediction(pId, pQ, pModels, pPs) => { + pId shouldBe 3 + pQ shouldBe q + pModels shouldBe None + pPs should have size algoCount + pPs shouldBe Seq( + Prediction(id = 20, q = q, models = Some(model0)), + Prediction(id = 21, q = q, models = Some(model1)), + Prediction(id = 22, q = q, models = Some(model2)) + ) + }} + } + }} + } + + test("Batch Evaluation") { + val engine = new FastEvalEngine( + Map("" -> classOf[PDataSource2]), + Map("" -> classOf[PPreparator1]), + Map("" -> classOf[PAlgo2]), + Map("" -> classOf[LServing1])) + + val qn = 10 + val en = 3 + + val baseEngineParams = EngineParams( + dataSourceParams = PDataSource2.Params(id = 0, en = en, qn = qn), + preparatorParams = PPreparator1.Params(1), + algorithmParamsList = Seq(("", PAlgo2.Params(2))), + servingParams = LServing1.Params(3)) + + val ep0 = baseEngineParams + val ep1 = baseEngineParams.copy( + algorithmParamsList = Seq(("", PAlgo2.Params(2)))) + val ep2 = baseEngineParams.copy( + algorithmParamsList = Seq(("", PAlgo2.Params(20)))) + + val engineEvalDataSet = engine.batchEval( + sc, + Seq(ep0, ep1, ep2), + WorkflowParams()) + + val evalDataSet0 = engineEvalDataSet(0)._2 + val evalDataSet1 = engineEvalDataSet(1)._2 + val evalDataSet2 = engineEvalDataSet(2)._2 + + evalDataSet0 shouldBe evalDataSet1 + evalDataSet0 should not be evalDataSet2 + evalDataSet1 should not be evalDataSet2 + + // evalDataSet0._1 should be theSameInstanceAs evalDataSet1._1 + // When things are cached correctly, evalDataSet0 and 1 should share the + // same EI + evalDataSet0.zip(evalDataSet1).foreach { case (e0, e1) => { + e0._1 should be theSameInstanceAs e1._1 + e0._2 should be theSameInstanceAs e1._2 + }} + + // So as set1 and set2, however, the QPA-RDD should be different. + evalDataSet1.zip(evalDataSet2).foreach { case (e1, e2) => { + e1._1 should be theSameInstanceAs e2._1 + val e1Qpa = e1._2 + val e2Qpa = e2._2 + e1Qpa should not be theSameInstanceAs (e2Qpa) + }} + } + + test("Not cached when isEqual not implemented") { + // PDataSource3.Params is a class not case class. Need to implement the + // isEqual function for hashing. + val engine = new FastEvalEngine( + Map("" -> classOf[PDataSource4]), + Map("" -> classOf[PPreparator1]), + Map("" -> classOf[PAlgo2]), + Map("" -> classOf[LServing1])) + + val qn = 10 + val en = 3 + + val baseEngineParams = EngineParams( + dataSourceParams = new PDataSource4.Params(id = 0, en = en, qn = qn), + preparatorParams = PPreparator1.Params(1), + algorithmParamsList = Seq(("", PAlgo2.Params(2))), + servingParams = LServing1.Params(3)) + + val ep0 = baseEngineParams + val ep1 = baseEngineParams.copy( + algorithmParamsList = Seq(("", PAlgo2.Params(3)))) + // ep2.dataSource is different from ep0. + val ep2 = baseEngineParams.copy( + dataSourceParams = ("", new PDataSource4.Params(id = 0, en = en, qn = qn)), + algorithmParamsList = Seq(("", PAlgo2.Params(3)))) + + val engineEvalDataSet = engine.batchEval( + sc, + Seq(ep0, ep1, ep2), + WorkflowParams()) + + val evalDataSet0 = engineEvalDataSet(0)._2 + val evalDataSet1 = engineEvalDataSet(1)._2 + val evalDataSet2 = engineEvalDataSet(2)._2 + + evalDataSet0 should not be evalDataSet1 + evalDataSet0 should not be evalDataSet2 + evalDataSet1 should not be evalDataSet2 + + // Set0 should have same EI as Set1, since their dsp are the same instance. + evalDataSet0.zip(evalDataSet1).foreach { case (e0, e1) => { + e0._1 should be theSameInstanceAs (e1._1) + }} + + // Set1 should have different EI as Set2, since Set2's dsp is another + // instance + evalDataSet1.zip(evalDataSet2).foreach { case (e1, e2) => { + e1._1 should not be theSameInstanceAs (e2._1) + }} + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/controller/MetricEvaluatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/predictionio/controller/MetricEvaluatorTest.scala b/core/src/test/scala/org/apache/predictionio/controller/MetricEvaluatorTest.scala new file mode 100644 index 0000000..a7e397a --- /dev/null +++ b/core/src/test/scala/org/apache/predictionio/controller/MetricEvaluatorTest.scala @@ -0,0 +1,52 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import org.apache.predictionio.workflow.SharedSparkContext +import org.apache.predictionio.workflow.WorkflowParams +import org.scalatest.FunSuite + +object MetricEvaluatorSuite { + case class Metric0() extends SumMetric[EmptyParams, Int, Int, Int, Int] { + def calculate(q: Int, p: Int, a: Int): Int = q + } + + object Evaluation0 extends Evaluation {} +} + +class MetricEvaluatorDevSuite extends FunSuite with SharedSparkContext { + import org.apache.predictionio.controller.MetricEvaluatorSuite._ + + test("a") { + val metricEvaluator = MetricEvaluator( + Metric0(), + Seq(Metric0(), Metric0()) + ) + + val engineEvalDataSet = Seq( + (EngineParams(), Seq( + (EmptyParams(), sc.parallelize(Seq((1,0,0), (2,0,0)))))), + (EngineParams(), Seq( + (EmptyParams(), sc.parallelize(Seq((1,0,0), (2,0,0))))))) + + val r = metricEvaluator.evaluateBase( + sc, + Evaluation0, + engineEvalDataSet, + WorkflowParams()) + + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/controller/MetricTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/predictionio/controller/MetricTest.scala b/core/src/test/scala/org/apache/predictionio/controller/MetricTest.scala new file mode 100644 index 0000000..67975b1 --- /dev/null +++ b/core/src/test/scala/org/apache/predictionio/controller/MetricTest.scala @@ -0,0 +1,143 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import org.apache.predictionio.workflow.SharedSparkContext + +import grizzled.slf4j.Logger +import org.scalatest.Matchers._ +import org.scalatest.FunSuite +import org.scalatest.Inside + +object MetricDevSuite { + class QIntSumMetric extends SumMetric[EmptyParams, Int, Int, Int, Int] { + def calculate(q: Int, p: Int, a: Int): Int = q + } + + class QDoubleSumMetric extends SumMetric[EmptyParams, Int, Int, Int, Double] { + def calculate(q: Int, p: Int, a: Int): Double = q.toDouble + } + + class QAverageMetric extends AverageMetric[EmptyParams, Int, Int, Int] { + def calculate(q: Int, p: Int, a: Int): Double = q.toDouble + } + + class QOptionAverageMetric extends OptionAverageMetric[EmptyParams, Int, Int, Int] { + def calculate(q: Int, p: Int, a: Int): Option[Double] = { + if (q < 0) { None } else { Some(q.toDouble) } + } + } + + class QStdevMetric extends StdevMetric[EmptyParams, Int, Int, Int] { + def calculate(q: Int, p: Int, a: Int): Double = q.toDouble + } + + class QOptionStdevMetric extends OptionStdevMetric[EmptyParams, Int, Int, Int] { + def calculate(q: Int, p: Int, a: Int): Option[Double] = { + if (q < 0) { None } else { Some(q.toDouble) } + } + } + +} + +class MetricDevSuite +extends FunSuite with Inside with SharedSparkContext { + @transient lazy val logger = Logger[this.type] + + test("Average Metric") { + val qpaSeq0 = Seq((1, 0, 0), (2, 0, 0), (3, 0, 0)) + val qpaSeq1 = Seq((4, 0, 0), (5, 0, 0), (6, 0, 0)) + + val evalDataSet = Seq( + (EmptyParams(), sc.parallelize(qpaSeq0)), + (EmptyParams(), sc.parallelize(qpaSeq1))) + + val m = new MetricDevSuite.QAverageMetric() + val result = m.calculate(sc, evalDataSet) + + result shouldBe (21.0 / 6) + } + + test("Option Average Metric") { + val qpaSeq0 = Seq((1, 0, 0), (2, 0, 0), (3, 0, 0)) + val qpaSeq1 = Seq((-4, 0, 0), (-5, 0, 0), (6, 0, 0)) + + val evalDataSet = Seq( + (EmptyParams(), sc.parallelize(qpaSeq0)), + (EmptyParams(), sc.parallelize(qpaSeq1))) + + val m = new MetricDevSuite.QOptionAverageMetric() + val result = m.calculate(sc, evalDataSet) + + result shouldBe (12.0 / 4) + } + + test("Stdev Metric") { + val qpaSeq0 = Seq((1, 0, 0), (1, 0, 0), (1, 0, 0), (1, 0, 0)) + val qpaSeq1 = Seq((5, 0, 0), (5, 0, 0), (5, 0, 0), (5, 0, 0)) + + val evalDataSet = Seq( + (EmptyParams(), sc.parallelize(qpaSeq0)), + (EmptyParams(), sc.parallelize(qpaSeq1))) + + val m = new MetricDevSuite.QStdevMetric() + val result = m.calculate(sc, evalDataSet) + + result shouldBe 2.0 + } + + test("Option Stdev Metric") { + val qpaSeq0 = Seq((1, 0, 0), (1, 0, 0), (1, 0, 0), (1, 0, 0)) + val qpaSeq1 = Seq((5, 0, 0), (5, 0, 0), (5, 0, 0), (5, 0, 0), (-5, 0, 0)) + + val evalDataSet = Seq( + (EmptyParams(), sc.parallelize(qpaSeq0)), + (EmptyParams(), sc.parallelize(qpaSeq1))) + + val m = new MetricDevSuite.QOptionStdevMetric() + val result = m.calculate(sc, evalDataSet) + + result shouldBe 2.0 + } + + test("Sum Metric [Int]") { + val qpaSeq0 = Seq((1, 0, 0), (2, 0, 0), (3, 0, 0)) + val qpaSeq1 = Seq((4, 0, 0), (5, 0, 0), (6, 0, 0)) + + val evalDataSet = Seq( + (EmptyParams(), sc.parallelize(qpaSeq0)), + (EmptyParams(), sc.parallelize(qpaSeq1))) + + val m = new MetricDevSuite.QIntSumMetric() + val result = m.calculate(sc, evalDataSet) + + result shouldBe 21 + } + + test("Sum Metric [Double]") { + val qpaSeq0 = Seq((1, 0, 0), (2, 0, 0), (3, 0, 0)) + val qpaSeq1 = Seq((4, 0, 0), (5, 0, 0), (6, 0, 0)) + + val evalDataSet = Seq( + (EmptyParams(), sc.parallelize(qpaSeq0)), + (EmptyParams(), sc.parallelize(qpaSeq1))) + + val m = new MetricDevSuite.QDoubleSumMetric() + val result = m.calculate(sc, evalDataSet) + + result shouldBe 21.0 + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/controller/SampleEngine.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/predictionio/controller/SampleEngine.scala b/core/src/test/scala/org/apache/predictionio/controller/SampleEngine.scala new file mode 100644 index 0000000..e238e86 --- /dev/null +++ b/core/src/test/scala/org/apache/predictionio/controller/SampleEngine.scala @@ -0,0 +1,472 @@ +package org.apache.predictionio.controller + +import org.apache.predictionio.controller.{Params => PIOParams} +import org.apache.predictionio.core._ + +import grizzled.slf4j.Logger +import org.apache.predictionio.workflow.WorkflowParams +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +object Engine0 { + @transient lazy val logger = Logger[this.type] + + case class TrainingData(id: Int, error: Boolean = false) extends SanityCheck { + def sanityCheck(): Unit = { + Predef.assert(!error, "Not Error") + } + } + + case class EvalInfo(id: Int) + case class ProcessedData(id: Int, td: TrainingData) + + case class Query(id: Int, ex: Int = 0, qx: Int = 0, supp: Boolean = false) + case class Actual(id: Int, ex: Int = 0, qx: Int = 0) + case class Prediction( + id: Int, q: Query, models: Option[Any] = None, + ps: Seq[Prediction] = Seq[Prediction]()) + + class PDataSource0(id: Int = 0) + extends PDataSource[TrainingData, EvalInfo, Query, Actual] { + def readTraining(sc: SparkContext): TrainingData = { + TrainingData(id) + } + } + + class PDataSource1(id: Int = 0, en: Int = 0, qn: Int = 0) + extends PDataSource[TrainingData, EvalInfo, Query, Actual] { + def readTraining(sc: SparkContext): TrainingData = TrainingData(id) + + override + def readEval(sc: SparkContext) + : Seq[(TrainingData, EvalInfo, RDD[(Query, Actual)])] = { + (0 until en).map { ex => { + val qaSeq: Seq[(Query, Actual)] = (0 until qn).map { qx => { + (Query(id, ex=ex, qx=qx), Actual(id, ex, qx)) + }} + (TrainingData(id), EvalInfo(id), sc.parallelize(qaSeq)) + }} + } + } + + object PDataSource2 { + case class Params(id: Int, en: Int = 0, qn: Int = 0) extends PIOParams + } + + class PDataSource2(params: PDataSource2.Params) + extends PDataSource[TrainingData, EvalInfo, Query, Actual] { + val id = params.id + def readTraining(sc: SparkContext): TrainingData = TrainingData(id) + + override + def readEval(sc: SparkContext) + : Seq[(TrainingData, EvalInfo, RDD[(Query, Actual)])] = { + (0 until params.en).map { ex => { + val qaSeq: Seq[(Query, Actual)] = (0 until params.qn).map { qx => { + (Query(id, ex=ex, qx=qx), Actual(id, ex, qx)) + }} + (TrainingData(id), EvalInfo(id), sc.parallelize(qaSeq)) + }} + } + } + + class PDataSource3(id: Int = 0, error: Boolean = false) + extends PDataSource[TrainingData, EvalInfo, Query, Actual] { + def readTraining(sc: SparkContext): TrainingData = { + TrainingData(id = id, error = error) + } + } + + object PDataSource4 { + class Params(val id: Int, val en: Int = 0, val qn: Int = 0) + extends PIOParams + } + + class PDataSource4(params: PDataSource4.Params) + extends PDataSource[TrainingData, EvalInfo, Query, Actual] { + val id = params.id + def readTraining(sc: SparkContext): TrainingData = TrainingData(id) + + override + def readEval(sc: SparkContext) + : Seq[(TrainingData, EvalInfo, RDD[(Query, Actual)])] = { + (0 until params.en).map { ex => { + val qaSeq: Seq[(Query, Actual)] = (0 until params.qn).map { qx => { + (Query(id, ex=ex, qx=qx), Actual(id, ex, qx)) + }} + (TrainingData(id), EvalInfo(id), sc.parallelize(qaSeq)) + }} + } + } + + class LDataSource0(id: Int, en: Int = 0, qn: Int = 0) + extends LDataSource[TrainingData, EvalInfo, Query, Actual] { + def readTraining(): TrainingData = TrainingData(id) + + override + def readEval() + : Seq[(TrainingData, EvalInfo, Seq[(Query, Actual)])] = { + (0 until en).map { ex => { + val qaSeq: Seq[(Query, Actual)] = (0 until qn).map { qx => { + (Query(id, ex=ex, qx=qx), Actual(id, ex, qx)) + }} + (TrainingData(id), EvalInfo(id), qaSeq) + }} + } + } + + object LDataSource1 { + case class Params(id: Int, en: Int = 0, qn: Int = 0) extends PIOParams + } + + class LDataSource1(params: LDataSource1.Params) + extends LDataSource[TrainingData, EvalInfo, Query, Actual] { + val id = params.id + def readTraining(): TrainingData = TrainingData(id) + + override + def readEval(): Seq[(TrainingData, EvalInfo, Seq[(Query, Actual)])] = { + (0 until params.en).map { ex => { + val qaSeq: Seq[(Query, Actual)] = (0 until params.qn).map { qx => { + (Query(id, ex=ex, qx=qx), Actual(id, ex, qx)) + }} + (TrainingData(id), EvalInfo(id), qaSeq) + }} + } + } + + class PPreparator0(id: Int = 0) + extends PPreparator[TrainingData, ProcessedData] { + def prepare(sc: SparkContext, td: TrainingData): ProcessedData = { + ProcessedData(id, td) + } + } + + object PPreparator1 { + case class Params(id: Int = 0) extends PIOParams + } + + class PPreparator1(params: PPreparator1.Params) + extends PPreparator[TrainingData, ProcessedData] { + def prepare(sc: SparkContext, td: TrainingData): ProcessedData = { + ProcessedData(params.id, td) + } + } + + class LPreparator0(id: Int = 0) + extends LPreparator[TrainingData, ProcessedData] { + def prepare(td: TrainingData): ProcessedData = { + ProcessedData(id, td) + } + } + + object LPreparator1 { + case class Params(id: Int = 0) extends PIOParams + } + + class LPreparator1(params: LPreparator1.Params) + extends LPreparator[TrainingData, ProcessedData] { + def prepare(td: TrainingData): ProcessedData = { + ProcessedData(params.id, td) + } + } + + object PAlgo0 { + case class Model(id: Int, pd: ProcessedData) + } + + class PAlgo0(id: Int = 0) + extends PAlgorithm[ProcessedData, PAlgo0.Model, Query, Prediction] { + def train(sc: SparkContext, pd: ProcessedData) + : PAlgo0.Model = PAlgo0.Model(id, pd) + + override + def batchPredict(m: PAlgo0.Model, qs: RDD[(Long, Query)]) + : RDD[(Long, Prediction)] = { + qs.mapValues(q => Prediction(id, q, Some(m))) + } + + def predict(m: PAlgo0.Model, q: Query): Prediction = { + Prediction(id, q, Some(m)) + } + } + + object PAlgo1 { + case class Model(id: Int, pd: ProcessedData) + } + + class PAlgo1(id: Int = 0) + extends PAlgorithm[ProcessedData, PAlgo1.Model, Query, Prediction] { + def train(sc: SparkContext, pd: ProcessedData) + : PAlgo1.Model = PAlgo1.Model(id, pd) + + override + def batchPredict(m: PAlgo1.Model, qs: RDD[(Long, Query)]) + : RDD[(Long, Prediction)] = { + qs.mapValues(q => Prediction(id, q, Some(m))) + } + + def predict(m: PAlgo1.Model, q: Query): Prediction = { + Prediction(id, q, Some(m)) + } + } + + object PAlgo2 { + case class Model(id: Int, pd: ProcessedData) + case class Params(id: Int) extends PIOParams + } + + class PAlgo2(params: PAlgo2.Params) + extends PAlgorithm[ProcessedData, PAlgo2.Model, Query, Prediction] { + val id = params.id + + def train(sc: SparkContext, pd: ProcessedData) + : PAlgo2.Model = PAlgo2.Model(id, pd) + + override + def batchPredict(m: PAlgo2.Model, qs: RDD[(Long, Query)]) + : RDD[(Long, Prediction)] = { + qs.mapValues(q => Prediction(id, q, Some(m))) + } + + def predict(m: PAlgo2.Model, q: Query): Prediction = { + Prediction(id, q, Some(m)) + } + } + + object PAlgo3 { + case class Model(id: Int, pd: ProcessedData) + extends LocalFileSystemPersistentModel[Params] + + object Model extends LocalFileSystemPersistentModelLoader[Params, Model] + + case class Params(id: Int) extends PIOParams + } + + class PAlgo3(params: PAlgo3.Params) + extends PAlgorithm[ProcessedData, PAlgo3.Model, Query, Prediction] { + val id = params.id + + def train(sc: SparkContext, pd: ProcessedData) + : PAlgo3.Model = PAlgo3.Model(id, pd) + + override + def batchPredict(m: PAlgo3.Model, qs: RDD[(Long, Query)]) + : RDD[(Long, Prediction)] = { + qs.mapValues(q => Prediction(id, q, Some(m))) + } + + def predict(m: PAlgo3.Model, q: Query): Prediction = { + Prediction(id, q, Some(m)) + } + } + + object LAlgo0 { + case class Model(id: Int, pd: ProcessedData) + } + + class LAlgo0(id: Int = 0) + extends LAlgorithm[ProcessedData, LAlgo0.Model, Query, Prediction] { + def train(pd: ProcessedData): LAlgo0.Model = LAlgo0.Model(id, pd) + + def predict(m: LAlgo0.Model, q: Query): Prediction = { + Prediction(id, q, Some(m)) + } + } + + object LAlgo1 { + case class Model(id: Int, pd: ProcessedData) + } + + class LAlgo1(id: Int = 0) + extends LAlgorithm[ProcessedData, LAlgo1.Model, Query, Prediction] { + def train(pd: ProcessedData): LAlgo1.Model = LAlgo1.Model(id, pd) + + def predict(m: LAlgo1.Model, q: Query): Prediction = { + Prediction(id, q, Some(m)) + } + } + + object LAlgo2 { + case class Params(id: Int) extends PIOParams + + case class Model(id: Int, pd: ProcessedData) + extends LocalFileSystemPersistentModel[EmptyParams] + + object Model extends LocalFileSystemPersistentModelLoader[EmptyParams, Model] + } + + class LAlgo2(params: LAlgo2.Params) + extends LAlgorithm[ProcessedData, LAlgo2.Model, Query, Prediction] { + def train(pd: ProcessedData): LAlgo2.Model = LAlgo2.Model(params.id, pd) + + def predict(m: LAlgo2.Model, q: Query): Prediction = { + Prediction(params.id, q, Some(m)) + } + } + + object LAlgo3 { + case class Params(id: Int) extends PIOParams + + case class Model(id: Int, pd: ProcessedData) + } + + class LAlgo3(params: LAlgo3.Params) + extends LAlgorithm[ProcessedData, LAlgo3.Model, Query, Prediction] { + def train(pd: ProcessedData): LAlgo3.Model = LAlgo3.Model(params.id, pd) + + def predict(m: LAlgo3.Model, q: Query): Prediction = { + Prediction(params.id, q, Some(m)) + } + } + + // N : P2L. As N is in the middle of P and L. + object NAlgo0 { + case class Model(id: Int, pd: ProcessedData) + } + + class NAlgo0 (id: Int = 0) + extends P2LAlgorithm[ProcessedData, NAlgo0.Model, Query, Prediction] { + def train(sc: SparkContext, pd: ProcessedData) + : NAlgo0.Model = NAlgo0.Model(id, pd) + + def predict(m: NAlgo0.Model, q: Query): Prediction = { + Prediction(id, q, Some(m)) + } + } + + object NAlgo1 { + case class Model(id: Int, pd: ProcessedData) + } + + class NAlgo1 (id: Int = 0) + extends P2LAlgorithm[ProcessedData, NAlgo1.Model, Query, Prediction] { + def train(sc: SparkContext, pd: ProcessedData) + : NAlgo1.Model = NAlgo1.Model(id, pd) + + def predict(m: NAlgo1.Model, q: Query): Prediction = { + Prediction(id, q, Some(m)) + } + } + + object NAlgo2 { + case class Params(id: Int) extends PIOParams + + case class Model(id: Int, pd: ProcessedData) + extends LocalFileSystemPersistentModel[EmptyParams] + + object Model extends LocalFileSystemPersistentModelLoader[EmptyParams, Model] + } + + class NAlgo2(params: NAlgo2.Params) + extends P2LAlgorithm[ProcessedData, NAlgo2.Model, Query, Prediction] { + def train(sc: SparkContext, pd: ProcessedData) + : NAlgo2.Model = NAlgo2.Model(params.id, pd) + + def predict(m: NAlgo2.Model, q: Query): Prediction = { + Prediction(params.id, q, Some(m)) + } + } + + object NAlgo3 { + case class Params(id: Int) extends PIOParams + + case class Model(id: Int, pd: ProcessedData) + } + + class NAlgo3(params: NAlgo3.Params) + extends P2LAlgorithm[ProcessedData, NAlgo3.Model, Query, Prediction] { + def train(sc: SparkContext, pd: ProcessedData) + : NAlgo3.Model = NAlgo3.Model(params.id, pd) + + def predict(m: NAlgo3.Model, q: Query): Prediction = { + Prediction(params.id, q, Some(m)) + } + } + + class LServing0(id: Int = 0) extends LServing[Query, Prediction] { + def serve(q: Query, ps: Seq[Prediction]): Prediction = { + Prediction(id, q, ps=ps) + } + } + + object LServing1 { + case class Params(id: Int) extends PIOParams + } + + class LServing1(params: LServing1.Params) extends LServing[Query, Prediction] { + def serve(q: Query, ps: Seq[Prediction]): Prediction = { + Prediction(params.id, q, ps=ps) + } + } + + class LServing2(id: Int) extends LServing[Query, Prediction] { + override + def supplement(q: Query): Query = q.copy(supp = true) + + def serve(q: Query, ps: Seq[Prediction]): Prediction = { + Prediction(id, q, ps=ps) + } + } +} + +object Engine1 { + case class EvalInfo(v: Double) extends Serializable + case class Query() extends Serializable + case class Prediction() extends Serializable + case class Actual() extends Serializable + case class DSP(v: Double) extends Params +} + +class Engine1 +extends BaseEngine[ + Engine1.EvalInfo, Engine1.Query, Engine1.Prediction, Engine1.Actual] { + + def train( + sc: SparkContext, + engineParams: EngineParams, + engineInstanceId: String = "", + params: WorkflowParams = WorkflowParams()): Seq[Any] = Seq[Any]() + + def eval(sc: SparkContext, engineParams: EngineParams, params: WorkflowParams) + : Seq[(Engine1.EvalInfo, + RDD[(Engine1.Query, Engine1.Prediction, Engine1.Actual)])] = { + val dsp = engineParams.dataSourceParams._2.asInstanceOf[Engine1.DSP] + Seq( + (Engine1.EvalInfo(dsp.v), + sc.emptyRDD[(Engine1.Query, Engine1.Prediction, Engine1.Actual)])) + } +} + + +class Metric0 +extends Metric[Engine1.EvalInfo, Engine1.Query, Engine1.Prediction, +Engine1.Actual, Double] { + override def header: String = "Metric0" + + def calculate( + sc: SparkContext, + evalDataSet: Seq[(Engine1.EvalInfo, RDD[(Engine1.Query, Engine1.Prediction, + Engine1.Actual)])]): Double = { + evalDataSet.head._1.v + } +} + +object Metric1 { + case class Result(c: Int, v: Double) extends Serializable +} + +class Metric1 +extends Metric[Engine1.EvalInfo, Engine1.Query, Engine1.Prediction, +Engine1.Actual, Metric1.Result]()(Ordering.by[Metric1.Result, Double](_.v)) { + override def header: String = "Metric1" + + def calculate( + sc: SparkContext, + evalDataSet: Seq[(Engine1.EvalInfo, RDD[(Engine1.Query, Engine1.Prediction, + Engine1.Actual)])]): Metric1.Result = { + Metric1.Result(0, evalDataSet.head._1.v) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/workflow/BaseTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/predictionio/workflow/BaseTest.scala b/core/src/test/scala/org/apache/predictionio/workflow/BaseTest.scala new file mode 100644 index 0000000..df36620 --- /dev/null +++ b/core/src/test/scala/org/apache/predictionio/workflow/BaseTest.scala @@ -0,0 +1,75 @@ +//package org.apache.spark +package org.apache.predictionio.workflow + +import _root_.io.netty.util.internal.logging.{Slf4JLoggerFactory, InternalLoggerFactory} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.BeforeAndAfterEach +import org.scalatest.Suite +import org.apache.spark.SparkContext +import org.apache.spark.SparkConf + + +/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it + * after each test. */ +trait LocalSparkContext +extends BeforeAndAfterEach with BeforeAndAfterAll { self: Suite => + + @transient var sc: SparkContext = _ + + override def beforeAll() { + InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory()) + super.beforeAll() + } + + override def afterEach() { + resetSparkContext() + super.afterEach() + } + + def resetSparkContext() = { + LocalSparkContext.stop(sc) + sc = null + } + +} + +object LocalSparkContext { + def stop(sc: SparkContext) { + if (sc != null) { + sc.stop() + } + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port") + } + + /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */ + def withSpark[T](sc: SparkContext)(f: SparkContext => T) = { + try { + f(sc) + } finally { + stop(sc) + } + } + +} +/** Shares a local `SparkContext` between all tests in a suite and closes it at the end */ +trait SharedSparkContext extends BeforeAndAfterAll { self: Suite => + + @transient private var _sc: SparkContext = _ + + def sc: SparkContext = _sc + + var conf = new SparkConf(false) + + override def beforeAll() { + _sc = new SparkContext("local[4]", "test", conf) + super.beforeAll() + } + + override def afterAll() { + LocalSparkContext.stop(_sc) + _sc = null + super.afterAll() + } +} + http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/workflow/EngineWorkflowTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/predictionio/workflow/EngineWorkflowTest.scala b/core/src/test/scala/org/apache/predictionio/workflow/EngineWorkflowTest.scala new file mode 100644 index 0000000..e69de29
