http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/controller/SampleEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/prediction/controller/SampleEngine.scala 
b/core/src/test/scala/io/prediction/controller/SampleEngine.scala
deleted file mode 100644
index 3a28ca9..0000000
--- a/core/src/test/scala/io/prediction/controller/SampleEngine.scala
+++ /dev/null
@@ -1,472 +0,0 @@
-package io.prediction.controller
-
-import io.prediction.controller.{Params => PIOParams}
-import io.prediction.core._
-
-import grizzled.slf4j.Logger
-import io.prediction.workflow.WorkflowParams
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-object Engine0 {
-  @transient lazy val logger = Logger[this.type] 
-
-  case class TrainingData(id: Int, error: Boolean = false) extends SanityCheck 
{
-    def sanityCheck(): Unit = {
-      Predef.assert(!error, "Not Error")
-    }
-  }
-
-  case class EvalInfo(id: Int)
-  case class ProcessedData(id: Int, td: TrainingData)
-
-  case class Query(id: Int, ex: Int = 0, qx: Int = 0, supp: Boolean = false)
-  case class Actual(id: Int, ex: Int = 0, qx: Int = 0)
-  case class Prediction(
-    id: Int, q: Query, models: Option[Any] = None, 
-    ps: Seq[Prediction] = Seq[Prediction]())
-
-  class PDataSource0(id: Int = 0) 
-  extends PDataSource[TrainingData, EvalInfo, Query, Actual] {
-    def readTraining(sc: SparkContext): TrainingData = {
-      TrainingData(id)
-    }
-  }
-  
-  class PDataSource1(id: Int = 0, en: Int = 0, qn: Int = 0)
-  extends PDataSource[TrainingData, EvalInfo, Query, Actual] {
-    def readTraining(sc: SparkContext): TrainingData = TrainingData(id)
-    
-    override
-    def readEval(sc: SparkContext)
-    : Seq[(TrainingData, EvalInfo, RDD[(Query, Actual)])] = {
-      (0 until en).map { ex => {
-        val qaSeq: Seq[(Query, Actual)] = (0 until qn).map { qx => {
-          (Query(id, ex=ex, qx=qx), Actual(id, ex, qx))
-        }}
-        (TrainingData(id), EvalInfo(id), sc.parallelize(qaSeq))
-      }}
-    }
-  }
-
-  object PDataSource2 {
-    case class Params(id: Int, en: Int = 0, qn: Int = 0) extends PIOParams
-  }
-  
-  class PDataSource2(params: PDataSource2.Params)
-  extends PDataSource[TrainingData, EvalInfo, Query, Actual] {
-    val id = params.id
-    def readTraining(sc: SparkContext): TrainingData = TrainingData(id)
-    
-    override
-    def readEval(sc: SparkContext)
-    : Seq[(TrainingData, EvalInfo, RDD[(Query, Actual)])] = {
-      (0 until params.en).map { ex => {
-        val qaSeq: Seq[(Query, Actual)] = (0 until params.qn).map { qx => {
-          (Query(id, ex=ex, qx=qx), Actual(id, ex, qx))
-        }}
-        (TrainingData(id), EvalInfo(id), sc.parallelize(qaSeq))
-      }}
-    }
-  }
-  
-  class PDataSource3(id: Int = 0, error: Boolean = false) 
-  extends PDataSource[TrainingData, EvalInfo, Query, Actual] {
-    def readTraining(sc: SparkContext): TrainingData = {
-      TrainingData(id = id, error = error)
-    }
-  }
-  
-  object PDataSource4 {
-    class Params(val id: Int, val en: Int = 0, val qn: Int = 0) 
-      extends PIOParams
-  }
-  
-  class PDataSource4(params: PDataSource4.Params)
-  extends PDataSource[TrainingData, EvalInfo, Query, Actual] {
-    val id = params.id
-    def readTraining(sc: SparkContext): TrainingData = TrainingData(id)
-    
-    override
-    def readEval(sc: SparkContext)
-    : Seq[(TrainingData, EvalInfo, RDD[(Query, Actual)])] = {
-      (0 until params.en).map { ex => {
-        val qaSeq: Seq[(Query, Actual)] = (0 until params.qn).map { qx => {
-          (Query(id, ex=ex, qx=qx), Actual(id, ex, qx))
-        }}
-        (TrainingData(id), EvalInfo(id), sc.parallelize(qaSeq))
-      }}
-    }
-  }
-  
-  class LDataSource0(id: Int, en: Int = 0, qn: Int = 0) 
-    extends LDataSource[TrainingData, EvalInfo, Query, Actual] {
-    def readTraining(): TrainingData = TrainingData(id)
-   
-    override
-    def readEval()
-    : Seq[(TrainingData, EvalInfo, Seq[(Query, Actual)])] = {
-      (0 until en).map { ex => {
-        val qaSeq: Seq[(Query, Actual)] = (0 until qn).map { qx => {
-          (Query(id, ex=ex, qx=qx), Actual(id, ex, qx))
-        }}
-        (TrainingData(id), EvalInfo(id), qaSeq)
-      }}
-    }
-  }
-  
-  object LDataSource1 {
-    case class Params(id: Int, en: Int = 0, qn: Int = 0) extends PIOParams
-  }
-  
-  class LDataSource1(params: LDataSource1.Params)
-  extends LDataSource[TrainingData, EvalInfo, Query, Actual] {
-    val id = params.id
-    def readTraining(): TrainingData = TrainingData(id)
-    
-    override
-    def readEval(): Seq[(TrainingData, EvalInfo, Seq[(Query, Actual)])] = {
-      (0 until params.en).map { ex => {
-        val qaSeq: Seq[(Query, Actual)] = (0 until params.qn).map { qx => {
-          (Query(id, ex=ex, qx=qx), Actual(id, ex, qx))
-        }}
-        (TrainingData(id), EvalInfo(id), qaSeq)
-      }}
-    }
-  }
-  
-  class PPreparator0(id: Int = 0)
-  extends PPreparator[TrainingData, ProcessedData] {
-    def prepare(sc: SparkContext, td: TrainingData): ProcessedData = {
-      ProcessedData(id, td)
-    }
-  }
-
-  object PPreparator1 {
-    case class Params(id: Int  = 0) extends PIOParams
-  }
-
-  class PPreparator1(params: PPreparator1.Params)
-  extends PPreparator[TrainingData, ProcessedData] {
-    def prepare(sc: SparkContext, td: TrainingData): ProcessedData = {
-      ProcessedData(params.id, td)
-    }
-  }
-
-  class LPreparator0(id: Int = 0) 
-  extends LPreparator[TrainingData, ProcessedData] {
-    def prepare(td: TrainingData): ProcessedData = {
-      ProcessedData(id, td)
-    }
-  }
-  
-  object LPreparator1 {
-    case class Params(id: Int  = 0) extends PIOParams
-  }
-
-  class LPreparator1(params: LPreparator1.Params)
-  extends LPreparator[TrainingData, ProcessedData] {
-    def prepare(td: TrainingData): ProcessedData = {
-      ProcessedData(params.id, td)
-    }
-  }
-
-  object PAlgo0 {
-    case class Model(id: Int, pd: ProcessedData)
-  }
-
-  class PAlgo0(id: Int = 0)
-  extends PAlgorithm[ProcessedData, PAlgo0.Model, Query, Prediction] {
-    def train(sc: SparkContext, pd: ProcessedData)
-    : PAlgo0.Model = PAlgo0.Model(id, pd)
-
-    override
-    def batchPredict(m: PAlgo0.Model, qs: RDD[(Long, Query)])
-    : RDD[(Long, Prediction)] = {
-      qs.mapValues(q => Prediction(id, q, Some(m)))
-    }
-    
-    def predict(m: PAlgo0.Model, q: Query): Prediction = {
-      Prediction(id, q, Some(m))
-    }
-  }
-
-  object PAlgo1 {
-    case class Model(id: Int, pd: ProcessedData)
-  }
-
-  class PAlgo1(id: Int = 0)
-  extends PAlgorithm[ProcessedData, PAlgo1.Model, Query, Prediction] {
-    def train(sc: SparkContext, pd: ProcessedData)
-    : PAlgo1.Model = PAlgo1.Model(id, pd)
-
-    override
-    def batchPredict(m: PAlgo1.Model, qs: RDD[(Long, Query)])
-    : RDD[(Long, Prediction)] = {
-      qs.mapValues(q => Prediction(id, q, Some(m)))
-    }
-
-    def predict(m: PAlgo1.Model, q: Query): Prediction = {
-      Prediction(id, q, Some(m))
-    }
-  }
-  
-  object PAlgo2 {
-    case class Model(id: Int, pd: ProcessedData)
-    case class Params(id: Int) extends PIOParams
-  }
-
-  class PAlgo2(params: PAlgo2.Params)
-  extends PAlgorithm[ProcessedData, PAlgo2.Model, Query, Prediction] {
-    val id = params.id
-
-    def train(sc: SparkContext, pd: ProcessedData)
-    : PAlgo2.Model = PAlgo2.Model(id, pd)
-
-    override
-    def batchPredict(m: PAlgo2.Model, qs: RDD[(Long, Query)])
-    : RDD[(Long, Prediction)] = {
-      qs.mapValues(q => Prediction(id, q, Some(m)))
-    }
-
-    def predict(m: PAlgo2.Model, q: Query): Prediction = {
-      Prediction(id, q, Some(m))
-    }
-  }
-  
-  object PAlgo3 {
-    case class Model(id: Int, pd: ProcessedData)
-    extends LocalFileSystemPersistentModel[Params]
-    
-    object Model extends LocalFileSystemPersistentModelLoader[Params, Model]
-
-    case class Params(id: Int) extends PIOParams
-  }
-
-  class PAlgo3(params: PAlgo3.Params)
-  extends PAlgorithm[ProcessedData, PAlgo3.Model, Query, Prediction] {
-    val id = params.id
-
-    def train(sc: SparkContext, pd: ProcessedData)
-    : PAlgo3.Model = PAlgo3.Model(id, pd)
-
-    override
-    def batchPredict(m: PAlgo3.Model, qs: RDD[(Long, Query)])
-    : RDD[(Long, Prediction)] = {
-      qs.mapValues(q => Prediction(id, q, Some(m)))
-    }
-
-    def predict(m: PAlgo3.Model, q: Query): Prediction = {
-      Prediction(id, q, Some(m))
-    }
-  }
-  
-  object LAlgo0 {
-    case class Model(id: Int, pd: ProcessedData)
-  }
-
-  class LAlgo0(id: Int = 0) 
-  extends LAlgorithm[ProcessedData, LAlgo0.Model, Query, Prediction] {
-    def train(pd: ProcessedData): LAlgo0.Model = LAlgo0.Model(id, pd)
-
-    def predict(m: LAlgo0.Model, q: Query): Prediction = {
-      Prediction(id, q, Some(m))
-    }
-  }
-  
-  object LAlgo1 {
-    case class Model(id: Int, pd: ProcessedData)
-  }
-
-  class LAlgo1(id: Int = 0) 
-  extends LAlgorithm[ProcessedData, LAlgo1.Model, Query, Prediction] {
-    def train(pd: ProcessedData): LAlgo1.Model = LAlgo1.Model(id, pd)
-    
-    def predict(m: LAlgo1.Model, q: Query): Prediction = {
-      Prediction(id, q, Some(m))
-    }
-  }
-  
-  object LAlgo2 {
-    case class Params(id: Int) extends PIOParams
-
-    case class Model(id: Int, pd: ProcessedData)
-    extends LocalFileSystemPersistentModel[EmptyParams]
-    
-    object Model extends LocalFileSystemPersistentModelLoader[EmptyParams, 
Model]
-  }
-
-  class LAlgo2(params: LAlgo2.Params) 
-  extends LAlgorithm[ProcessedData, LAlgo2.Model, Query, Prediction] {
-    def train(pd: ProcessedData): LAlgo2.Model = LAlgo2.Model(params.id, pd)
-    
-    def predict(m: LAlgo2.Model, q: Query): Prediction = {
-      Prediction(params.id, q, Some(m))
-    }
-  }
-
-  object LAlgo3 {
-    case class Params(id: Int) extends PIOParams
-
-    case class Model(id: Int, pd: ProcessedData)
-  }
-
-  class LAlgo3(params: LAlgo3.Params) 
-  extends LAlgorithm[ProcessedData, LAlgo3.Model, Query, Prediction] {
-    def train(pd: ProcessedData): LAlgo3.Model = LAlgo3.Model(params.id, pd)
-    
-    def predict(m: LAlgo3.Model, q: Query): Prediction = {
-      Prediction(params.id, q, Some(m))
-    }
-  }
-
-  // N : P2L. As N is in the middle of P and L.
-  object NAlgo0 {
-    case class Model(id: Int, pd: ProcessedData)
-  }
-
-  class NAlgo0 (id: Int = 0)
-  extends P2LAlgorithm[ProcessedData, NAlgo0.Model, Query, Prediction] {
-    def train(sc: SparkContext, pd: ProcessedData)
-    : NAlgo0.Model = NAlgo0.Model(id, pd)
-  
-    def predict(m: NAlgo0.Model, q: Query): Prediction = {
-      Prediction(id, q, Some(m))
-    }
-  }
-
-  object NAlgo1 {
-    case class Model(id: Int, pd: ProcessedData)
-  }
-
-  class NAlgo1 (id: Int = 0)
-  extends P2LAlgorithm[ProcessedData, NAlgo1.Model, Query, Prediction] {
-    def train(sc: SparkContext, pd: ProcessedData)
-    : NAlgo1.Model = NAlgo1.Model(id, pd)
-   
-    def predict(m: NAlgo1.Model, q: Query): Prediction = {
-      Prediction(id, q, Some(m))
-    }
-  }
-  
-  object NAlgo2 {
-    case class Params(id: Int) extends PIOParams
-
-    case class Model(id: Int, pd: ProcessedData)
-    extends LocalFileSystemPersistentModel[EmptyParams]
-    
-    object Model extends LocalFileSystemPersistentModelLoader[EmptyParams, 
Model]
-  }
-
-  class NAlgo2(params: NAlgo2.Params) 
-  extends P2LAlgorithm[ProcessedData, NAlgo2.Model, Query, Prediction] {
-    def train(sc: SparkContext, pd: ProcessedData)
-    : NAlgo2.Model = NAlgo2.Model(params.id, pd)
-    
-    def predict(m: NAlgo2.Model, q: Query): Prediction = {
-      Prediction(params.id, q, Some(m))
-    }
-  }
-
-  object NAlgo3 {
-    case class Params(id: Int) extends PIOParams
-
-    case class Model(id: Int, pd: ProcessedData)
-  }
-
-  class NAlgo3(params: NAlgo3.Params) 
-  extends P2LAlgorithm[ProcessedData, NAlgo3.Model, Query, Prediction] {
-    def train(sc: SparkContext, pd: ProcessedData)
-    : NAlgo3.Model = NAlgo3.Model(params.id, pd)
-    
-    def predict(m: NAlgo3.Model, q: Query): Prediction = {
-      Prediction(params.id, q, Some(m))
-    }
-  }
-
-  class LServing0(id: Int = 0) extends LServing[Query, Prediction] {
-    def serve(q: Query, ps: Seq[Prediction]): Prediction = {
-      Prediction(id, q, ps=ps)
-    }
-  }
-
-  object LServing1 {
-    case class Params(id: Int) extends PIOParams
-  }
-  
-  class LServing1(params: LServing1.Params) extends LServing[Query, 
Prediction] {
-    def serve(q: Query, ps: Seq[Prediction]): Prediction = {
-      Prediction(params.id, q, ps=ps)
-    }
-  }
-  
-  class LServing2(id: Int) extends LServing[Query, Prediction] {
-    override
-    def supplement(q: Query): Query = q.copy(supp = true)
-
-    def serve(q: Query, ps: Seq[Prediction]): Prediction = {
-      Prediction(id, q, ps=ps)
-    }
-  }
-}
-
-object Engine1 {
-  case class EvalInfo(v: Double) extends Serializable
-  case class Query() extends Serializable
-  case class Prediction() extends Serializable
-  case class Actual() extends Serializable
-  case class DSP(v: Double) extends Params
-}
-
-class Engine1 
-extends BaseEngine[
-  Engine1.EvalInfo, Engine1.Query, Engine1.Prediction, Engine1.Actual] {
-
-  def train(
-    sc: SparkContext, 
-    engineParams: EngineParams,
-    engineInstanceId: String = "",
-    params: WorkflowParams = WorkflowParams()): Seq[Any] = Seq[Any]()
-
-  def eval(sc: SparkContext, engineParams: EngineParams, params: 
WorkflowParams)
-  : Seq[(Engine1.EvalInfo, 
-      RDD[(Engine1.Query, Engine1.Prediction, Engine1.Actual)])] = {
-    val dsp = engineParams.dataSourceParams._2.asInstanceOf[Engine1.DSP]
-    Seq(
-      (Engine1.EvalInfo(dsp.v),
-        sc.emptyRDD[(Engine1.Query, Engine1.Prediction, Engine1.Actual)]))
-  }
-}
-
-
-class Metric0
-extends Metric[Engine1.EvalInfo, Engine1.Query, Engine1.Prediction,
-Engine1.Actual, Double] {
-  override def header: String = "Metric0"
-
-  def calculate(
-    sc: SparkContext, 
-    evalDataSet: Seq[(Engine1.EvalInfo, RDD[(Engine1.Query, Engine1.Prediction,
-    Engine1.Actual)])]): Double = {
-    evalDataSet.head._1.v
-  }
-}
-
-object Metric1 {
-  case class Result(c: Int, v: Double) extends Serializable
-}
-
-class Metric1
-extends Metric[Engine1.EvalInfo, Engine1.Query, Engine1.Prediction,
-Engine1.Actual, Metric1.Result]()(Ordering.by[Metric1.Result, Double](_.v)) {
-  override def header: String = "Metric1"
-
-  def calculate(
-    sc: SparkContext, 
-    evalDataSet: Seq[(Engine1.EvalInfo, RDD[(Engine1.Query, Engine1.Prediction,
-    Engine1.Actual)])]): Metric1.Result = {
-    Metric1.Result(0, evalDataSet.head._1.v)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/workflow/BaseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/prediction/workflow/BaseTest.scala 
b/core/src/test/scala/io/prediction/workflow/BaseTest.scala
deleted file mode 100644
index 4925558..0000000
--- a/core/src/test/scala/io/prediction/workflow/BaseTest.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-//package org.apache.spark
-package io.prediction.workflow
-
-import _root_.io.netty.util.internal.logging.{Slf4JLoggerFactory, 
InternalLoggerFactory}
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.BeforeAndAfterEach
-import org.scalatest.Suite
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkConf
-
-
-/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it
-  * after each test. */
-trait LocalSparkContext 
-extends BeforeAndAfterEach with BeforeAndAfterAll { self: Suite =>
-
-  @transient var sc: SparkContext = _
-
-  override def beforeAll() {
-    InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory())
-    super.beforeAll()
-  }
-
-  override def afterEach() {
-    resetSparkContext()
-    super.afterEach()
-  }
-
-  def resetSparkContext() = {
-    LocalSparkContext.stop(sc)
-    sc = null
-  }
-
-}
-
-object LocalSparkContext {
-  def stop(sc: SparkContext) {
-    if (sc != null) {
-      sc.stop()
-    }
-    // To avoid Akka rebinding to the same port, since it doesn't unbind 
immediately on shutdown
-    System.clearProperty("spark.driver.port")
-  }
-
-  /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */
-  def withSpark[T](sc: SparkContext)(f: SparkContext => T) = {
-    try {
-      f(sc)
-    } finally {
-      stop(sc)
-    }
-  }
-
-}
-/** Shares a local `SparkContext` between all tests in a suite and closes it 
at the end */
-trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
-
-  @transient private var _sc: SparkContext = _
-
-  def sc: SparkContext = _sc
-
-  var conf = new SparkConf(false)
-
-  override def beforeAll() {
-    _sc = new SparkContext("local[4]", "test", conf)
-    super.beforeAll()
-  }
-
-  override def afterAll() {
-    LocalSparkContext.stop(_sc)
-    _sc = null
-    super.afterAll()
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/workflow/EngineWorkflowTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/io/prediction/workflow/EngineWorkflowTest.scala 
b/core/src/test/scala/io/prediction/workflow/EngineWorkflowTest.scala
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/workflow/EvaluationWorkflowTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/io/prediction/workflow/EvaluationWorkflowTest.scala 
b/core/src/test/scala/io/prediction/workflow/EvaluationWorkflowTest.scala
deleted file mode 100644
index 7a50d33..0000000
--- a/core/src/test/scala/io/prediction/workflow/EvaluationWorkflowTest.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-package io.prediction.workflow
-
-import io.prediction.controller._
-
-import org.scalatest.FunSuite
-import org.scalatest.Matchers._
-
-class EvaluationWorkflowSuite extends FunSuite with SharedSparkContext {
-
-  test("Evaluation return best engine params, simple result type: Double") {
-    val engine = new Engine1()
-    val ep0 = EngineParams(dataSourceParams = Engine1.DSP(0.2))
-    val ep1 = EngineParams(dataSourceParams = Engine1.DSP(0.3))
-    val ep2 = EngineParams(dataSourceParams = Engine1.DSP(0.3))
-    val ep3 = EngineParams(dataSourceParams = Engine1.DSP(-0.2))
-    val engineParamsList = Seq(ep0, ep1, ep2, ep3)
-
-    val evaluator = MetricEvaluator(new Metric0())
-  
-    object Eval extends Evaluation {
-      engineEvaluator = (new Engine1(), MetricEvaluator(new Metric0()))
-    }
-
-    val result = EvaluationWorkflow.runEvaluation(
-      sc,
-      Eval,
-      engine,
-      engineParamsList,
-      evaluator,
-      WorkflowParams())
-
-    result.bestScore.score shouldBe 0.3
-    result.bestEngineParams shouldBe ep1
-  }
-
-  test("Evaluation return best engine params, complex result type") {
-    val engine = new Engine1()
-    val ep0 = EngineParams(dataSourceParams = Engine1.DSP(0.2))
-    val ep1 = EngineParams(dataSourceParams = Engine1.DSP(0.3))
-    val ep2 = EngineParams(dataSourceParams = Engine1.DSP(0.3))
-    val ep3 = EngineParams(dataSourceParams = Engine1.DSP(-0.2))
-    val engineParamsList = Seq(ep0, ep1, ep2, ep3)
-
-    val evaluator = MetricEvaluator(new Metric1())
-    
-    object Eval extends Evaluation {
-      engineEvaluator = (new Engine1(), MetricEvaluator(new Metric1()))
-    }
-
-    val result = EvaluationWorkflow.runEvaluation(
-      sc,
-      Eval,
-      engine,
-      engineParamsList,
-      evaluator,
-      WorkflowParams())
-  
-    result.bestScore.score shouldBe Metric1.Result(0, 0.3)
-    result.bestEngineParams shouldBe ep1
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/workflow/JsonExtractorSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/io/prediction/workflow/JsonExtractorSuite.scala 
b/core/src/test/scala/io/prediction/workflow/JsonExtractorSuite.scala
deleted file mode 100644
index 34ff751..0000000
--- a/core/src/test/scala/io/prediction/workflow/JsonExtractorSuite.scala
+++ /dev/null
@@ -1,383 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.workflow
-
-import io.prediction.controller.EngineParams
-import io.prediction.controller.Params
-import io.prediction.controller.Utils
-import org.json4s.CustomSerializer
-import org.json4s.JsonAST.JField
-import org.json4s.JsonAST.JObject
-import org.json4s.JsonAST.JString
-import org.json4s.MappingException
-import org.json4s.native.JsonMethods.compact
-import org.json4s.native.JsonMethods.render
-import org.scalatest.FunSuite
-import org.scalatest.Matchers
-
-class JsonExtractorSuite extends FunSuite with Matchers {
-
-  test("Extract Scala object using option Json4sNative works with optional and 
default value " +
-    "provided") {
-
-    val json = """{"string": "query string", "optional": "optional string", 
"default": "d"}"""
-
-    val query = JsonExtractor.extract(
-      JsonExtractorOption.Json4sNative,
-      json,
-      classOf[ScalaQuery])
-
-    query should be (ScalaQuery("query string", Some("optional string"), "d"))
-  }
-
-  test("Extract Scala object using option Json4sNative works with no optional 
and no default " +
-    "value provided") {
-
-    val json = """{"string": "query string"}"""
-
-    val query = JsonExtractor.extract(
-      JsonExtractorOption.Json4sNative,
-      json,
-      classOf[ScalaQuery])
-
-    query should be (ScalaQuery("query string", None, "default"))
-  }
-
-  test("Extract Scala object using option Json4sNative works with null 
optional and null default" +
-    " value") {
-
-    val json = """{"string": "query string", "optional": null, "default": 
null}"""
-
-    val query = JsonExtractor.extract(
-      JsonExtractorOption.Json4sNative,
-      json,
-      classOf[ScalaQuery])
-
-    query should be (ScalaQuery("query string", None, "default"))
-  }
-
-  test("Extract Scala object using option Both works with optional and default 
value provided") {
-
-    val json = """{"string": "query string", "optional": "optional string", 
"default": "d"}"""
-
-    val query = JsonExtractor.extract(
-      JsonExtractorOption.Json4sNative,
-      json,
-      classOf[ScalaQuery])
-
-    query should be (ScalaQuery("query string", Some("optional string"), "d"))
-  }
-
-  test("Extract Scala object using option Both works with no optional and no 
default value " +
-    "provided") {
-
-    val json = """{"string": "query string"}"""
-
-    val query = JsonExtractor.extract(
-      JsonExtractorOption.Json4sNative,
-      json,
-      classOf[ScalaQuery])
-
-    query should be (ScalaQuery("query string", None, "default"))
-  }
-
-  test("Extract Scala object using option Both works with null optional and 
null default value") {
-
-    val json = """{"string": "query string", "optional": null, "default": 
null}"""
-
-    val query = JsonExtractor.extract(
-      JsonExtractorOption.Json4sNative,
-      json,
-      classOf[ScalaQuery])
-
-    query should be (ScalaQuery("query string", None, "default"))
-  }
-
-  test("Extract Scala object using option Gson should not get default value 
and optional none" +
-    " value") {
-
-    val json = """{"string": "query string"}"""
-    val query = JsonExtractor.extract(
-      JsonExtractorOption.Gson,
-      json,
-      classOf[ScalaQuery])
-
-    query should be (ScalaQuery("query string", null, null))
-  }
-
-  test("Extract Scala object using option Gson should throw an exception with 
optional " +
-    "value provided") {
-
-    val json = """{"string": "query string", "optional": "o", "default": 
"d"}"""
-    intercept[RuntimeException] {
-      JsonExtractor.extract(
-        JsonExtractorOption.Gson,
-        json,
-        classOf[ScalaQuery])
-    }
-  }
-
-  test("Extract Java object using option Gson works") {
-
-    val json = """{"q": "query string"}"""
-
-    val query = JsonExtractor.extract(
-      JsonExtractorOption.Gson,
-      json,
-      classOf[JavaQuery])
-
-    query should be (new JavaQuery("query string"))
-  }
-
-  test("Extract Java object using option Both works") {
-
-    val json = """{"q": "query string"}"""
-
-    val query = JsonExtractor.extract(
-      JsonExtractorOption.Both,
-      json,
-      classOf[JavaQuery])
-
-    query should be (new JavaQuery("query string"))
-  }
-
-  test("Extract Java object using option Json4sNative should throw an 
exception") {
-
-    val json = """{"q": "query string"}"""
-
-    intercept[MappingException] {
-      JsonExtractor.extract(
-        JsonExtractorOption.Json4sNative,
-        json,
-        classOf[JavaQuery])
-    }
-  }
-
-  test("Extract Scala object using option Json4sNative with custom 
deserializer") {
-    val json = """{"string": "query string", "optional": "o", "default": 
"d"}"""
-
-    val query = JsonExtractor.extract(
-      JsonExtractorOption.Json4sNative,
-      json,
-      classOf[ScalaQuery],
-      Utils.json4sDefaultFormats + new UpperCaseFormat
-    )
-
-    query should be(ScalaQuery("QUERY STRING", Some("O"), "D"))
-  }
-
-  test("Extract Java object usingoption Gson with custom deserializer") {
-    val json = """{"q": "query string"}"""
-
-    val query = JsonExtractor.extract(
-      extractorOption = JsonExtractorOption.Gson,
-      json = json,
-      clazz = classOf[JavaQuery],
-      gsonTypeAdapterFactories = Seq(new JavaQueryTypeAdapterFactory)
-    )
-
-    query should be(new JavaQuery("QUERY STRING"))
-  }
-
-  test("Java object to JValue using option Both works") {
-    val query = new JavaQuery("query string")
-    val jValue = JsonExtractor.toJValue(JsonExtractorOption.Both, query)
-
-    compact(render(jValue)) should be ("""{"q":"query string"}""")
-  }
-
-  test("Java object to JValue using option Gson works") {
-    val query = new JavaQuery("query string")
-    val jValue = JsonExtractor.toJValue(JsonExtractorOption.Gson, query)
-
-    compact(render(jValue)) should be ("""{"q":"query string"}""")
-  }
-
-  test("Java object to JValue using option Json4sNative results in empty 
Json") {
-    val query = new JavaQuery("query string")
-    val jValue = JsonExtractor.toJValue(JsonExtractorOption.Json4sNative, 
query)
-
-    compact(render(jValue)) should be ("""{}""")
-  }
-
-  test("Scala object to JValue using option Both works") {
-    val query = new ScalaQuery("query string", Some("option"))
-    val jValue = JsonExtractor.toJValue(JsonExtractorOption.Both, query)
-
-    compact(render(jValue)) should
-      be ("""{"string":"query 
string","optional":"option","default":"default"}""")
-  }
-
-  test("Scala object to JValue using option Gson does not serialize optional") 
{
-    val query = new ScalaQuery("query string", Some("option"))
-    val jValue = JsonExtractor.toJValue(JsonExtractorOption.Gson, query)
-
-    compact(render(jValue)) should
-      be ("""{"string":"query string","optional":{},"default":"default"}""")
-  }
-
-  test("Scala object to JValue using option Json4sNative works") {
-    val query = new ScalaQuery("query string", Some("option"))
-    val jValue = JsonExtractor.toJValue(JsonExtractorOption.Json4sNative, 
query)
-
-    compact(render(jValue)) should
-      be ("""{"string":"query 
string","optional":"option","default":"default"}""")
-  }
-
-  test("Scala object to JValue using option Json4sNative with custom 
serializer") {
-    val query = new ScalaQuery("query string", Some("option"))
-    val jValue = JsonExtractor.toJValue(
-      JsonExtractorOption.Json4sNative,
-      query,
-      Utils.json4sDefaultFormats + new UpperCaseFormat
-    )
-
-    compact(render(jValue)) should
-      be ("""{"string":"QUERY 
STRING","optional":"OPTION","default":"DEFAULT"}""")
-  }
-
-  test("Java object to JValue using option Gson with custom serializer") {
-    val query = new JavaQuery("query string")
-    val jValue = JsonExtractor.toJValue(
-      extractorOption = JsonExtractorOption.Gson,
-      o = query,
-      gsonTypeAdapterFactories = Seq(new JavaQueryTypeAdapterFactory)
-    )
-
-    compact(render(jValue)) should be ("""{"q":"QUERY STRING"}""")
-  }
-
-  test("Java Param to Json using option Both") {
-    val param = ("algo", new JavaParams("parameter"))
-    val json = JsonExtractor.paramToJson(JsonExtractorOption.Both, param)
-
-    json should be ("""{"algo":{"p":"parameter"}}""")
-  }
-
-  test("Java Param to Json using option Gson") {
-    val param = ("algo", new JavaParams("parameter"))
-    val json = JsonExtractor.paramToJson(JsonExtractorOption.Gson, param)
-
-    json should be ("""{"algo":{"p":"parameter"}}""")
-  }
-
-  test("Scala Param to Json using option Both") {
-    val param = ("algo", AlgorithmParams("parameter"))
-    val json = JsonExtractor.paramToJson(JsonExtractorOption.Both, param)
-
-    json should be ("""{"algo":{"a":"parameter"}}""")
-  }
-
-  test("Scala Param to Json using option Json4sNative") {
-    val param = ("algo", AlgorithmParams("parameter"))
-    val json = JsonExtractor.paramToJson(JsonExtractorOption.Json4sNative, 
param)
-
-    json should be ("""{"algo":{"a":"parameter"}}""")
-  }
-
-  test("Java Params to Json using option Both") {
-    val params = Seq(("algo", new JavaParams("parameter")), ("algo2", new 
JavaParams("parameter2")))
-    val json = JsonExtractor.paramsToJson(JsonExtractorOption.Both, params)
-
-    json should be 
("""[{"algo":{"p":"parameter"}},{"algo2":{"p":"parameter2"}}]""")
-  }
-
-  test("Java Params to Json using option Gson") {
-    val params = Seq(("algo", new JavaParams("parameter")), ("algo2", new 
JavaParams("parameter2")))
-    val json = JsonExtractor.paramsToJson(JsonExtractorOption.Gson, params)
-
-    json should be 
("""[{"algo":{"p":"parameter"}},{"algo2":{"p":"parameter2"}}]""")
-  }
-
-  test("Scala Params to Json using option Both") {
-    val params =
-      Seq(("algo", AlgorithmParams("parameter")), ("algo2", 
AlgorithmParams("parameter2")))
-    val json = JsonExtractor.paramsToJson(JsonExtractorOption.Both, params)
-
-    json should be 
(org.json4s.native.Serialization.write(params)(Utils.json4sDefaultFormats))
-  }
-
-  test("Scala Params to Json using option Json4sNative") {
-    val params =
-      Seq(("algo", AlgorithmParams("parameter")), ("algo2", 
AlgorithmParams("parameter2")))
-    val json = JsonExtractor.paramsToJson(JsonExtractorOption.Json4sNative, 
params)
-
-    json should be 
(org.json4s.native.Serialization.write(params)(Utils.json4sDefaultFormats))
-  }
-
-  test("Mixed Java and Scala Params to Json using option Both") {
-    val params =
-      Seq(("scala", AlgorithmParams("parameter")), ("java", new 
JavaParams("parameter2")))
-    val json = JsonExtractor.paramsToJson(JsonExtractorOption.Both, params)
-
-    json should be 
("""[{"scala":{"a":"parameter"}},{"java":{"p":"parameter2"}}]""")
-  }
-
-  test("Serializing Scala EngineParams works using option Json4sNative") {
-    val ep = new EngineParams(
-      dataSourceParams = ("ds", DataSourceParams("dsp")),
-      algorithmParamsList = Seq(("a0", AlgorithmParams("ap"))))
-
-    val json = 
JsonExtractor.engineParamsToJson(JsonExtractorOption.Json4sNative, ep)
-
-    json should be (
-      """{"dataSourceParams":{"ds":{"a":"dsp"}},"preparatorParams":{"":{}},""" 
+
-        
""""algorithmParamsList":[{"a0":{"a":"ap"}}],"servingParams":{"":{}}}""")
-  }
-
-  test("Serializing Java EngineParams works using option Gson") {
-    val ep = new EngineParams(
-      dataSourceParams = ("ds", new JavaParams("dsp")),
-      algorithmParamsList = Seq(("a0", new JavaParams("ap")), ("a1", new 
JavaParams("ap2"))))
-
-    val json = JsonExtractor.engineParamsToJson(JsonExtractorOption.Gson, ep)
-
-    json should be (
-      """{"dataSourceParams":{"ds":{"p":"dsp"}},"preparatorParams":{"":{}},""" 
+
-        
""""algorithmParamsList":[{"a0":{"p":"ap"}},{"a1":{"p":"ap2"}}],"servingParams":{"":{}}}""")
-  }
-
-  test("Serializing Java EngineParams works using option Both") {
-    val ep = new EngineParams(
-      dataSourceParams = ("ds", new JavaParams("dsp")),
-      algorithmParamsList = Seq(("a0", new JavaParams("ap")), ("a1", new 
JavaParams("ap2"))))
-
-    val json = JsonExtractor.engineParamsToJson(JsonExtractorOption.Both, ep)
-
-    json should be (
-      """{"dataSourceParams":{"ds":{"p":"dsp"}},"preparatorParams":{"":{}},""" 
+
-        
""""algorithmParamsList":[{"a0":{"p":"ap"}},{"a1":{"p":"ap2"}}],"servingParams":{"":{}}}""")
-  }
-}
-
-private case class AlgorithmParams(a: String) extends Params
-
-private case class DataSourceParams(a: String) extends Params
-
-private case class ScalaQuery(string: String, optional: Option[String], 
default: String = "default")
-
-private class UpperCaseFormat extends CustomSerializer[ScalaQuery](format => ( 
{
-  case JObject(JField("string", JString(string)) ::
-    JField("optional", JString(optional)) ::
-    JField("default", JString(default)) ::
-    Nil) => ScalaQuery(string.toUpperCase, Some(optional.toUpperCase), 
default.toUpperCase)
-}, {
-  case x: ScalaQuery =>
-    JObject(
-      JField("string", JString(x.string.toUpperCase)),
-      JField("optional", JString(x.optional.get.toUpperCase)),
-      JField("default", JString(x.default.toUpperCase)))
-}))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/controller/EngineTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/predictionio/controller/EngineTest.scala 
b/core/src/test/scala/org/apache/predictionio/controller/EngineTest.scala
new file mode 100644
index 0000000..eebe0af
--- /dev/null
+++ b/core/src/test/scala/org/apache/predictionio/controller/EngineTest.scala
@@ -0,0 +1,615 @@
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.workflow.PersistentModelManifest
+import org.apache.predictionio.workflow.SharedSparkContext
+import org.apache.predictionio.workflow.StopAfterPrepareInterruption
+import org.apache.predictionio.workflow.StopAfterReadInterruption
+
+import grizzled.slf4j.Logger
+import org.apache.predictionio.workflow.WorkflowParams
+import org.apache.spark.rdd.RDD
+import org.scalatest.Inspectors._
+import org.scalatest.Matchers._
+import org.scalatest.FunSuite
+import org.scalatest.Inside
+
+import scala.util.Random
+
+class EngineSuite
+extends FunSuite with Inside with SharedSparkContext {
+  import org.apache.predictionio.controller.Engine0._
+  @transient lazy val logger = Logger[this.type] 
+
+  test("Engine.train") {
+    val engine = new Engine(
+      classOf[PDataSource2],
+      classOf[PPreparator1],
+      Map("" -> classOf[PAlgo2]),
+      classOf[LServing1])
+
+    val engineParams = EngineParams(
+      dataSourceParams = PDataSource2.Params(0),
+      preparatorParams = PPreparator1.Params(1),
+      algorithmParamsList = Seq(("", PAlgo2.Params(2))),
+      servingParams = LServing1.Params(3))
+
+    val models = engine.train(
+      sc, 
+      engineParams, 
+      engineInstanceId = "",
+      params = WorkflowParams())
+    
+    val pd = ProcessedData(1, TrainingData(0))
+
+    // PAlgo2.Model doesn't have IPersistentModel trait implemented. Hence the
+    // model extract after train is Unit.
+    models should contain theSameElementsAs Seq(Unit)
+  }
+
+  test("Engine.train persisting PAlgo.Model") {
+    val engine = new Engine(
+      classOf[PDataSource2],
+      classOf[PPreparator1],
+      Map(
+        "PAlgo2" -> classOf[PAlgo2],
+        "PAlgo3" -> classOf[PAlgo3]
+      ),
+      classOf[LServing1])
+
+    val engineParams = EngineParams(
+      dataSourceParams = PDataSource2.Params(0),
+      preparatorParams = PPreparator1.Params(1),
+      algorithmParamsList = Seq(
+        ("PAlgo2", PAlgo2.Params(2)),
+        ("PAlgo3", PAlgo3.Params(21)),
+        ("PAlgo3", PAlgo3.Params(22))
+      ),
+      servingParams = LServing1.Params(3))
+
+    val pd = ProcessedData(1, TrainingData(0))
+    val model21 = PAlgo3.Model(21, pd)
+    val model22 = PAlgo3.Model(22, pd)
+
+    val models = engine.train(
+      sc, 
+      engineParams, 
+      engineInstanceId = "",
+      params = WorkflowParams())
+
+    val pModel21 = PersistentModelManifest(model21.getClass.getName)
+    val pModel22 = PersistentModelManifest(model22.getClass.getName)
+    
+    models should contain theSameElementsAs Seq(Unit, pModel21, pModel22)
+  }
+
+  test("Engine.train persisting LAlgo.Model") {
+    val engine = Engine(
+      classOf[LDataSource1],
+      classOf[LPreparator1],
+      Map(
+        "LAlgo1" -> classOf[LAlgo1],
+        "LAlgo2" -> classOf[LAlgo2],
+        "LAlgo3" -> classOf[LAlgo3]
+      ),
+      classOf[LServing1])
+
+    val engineParams = EngineParams(
+      dataSourceParams = LDataSource1.Params(0),
+      preparatorParams = LPreparator1.Params(1),
+      algorithmParamsList = Seq(
+        ("LAlgo2", LAlgo2.Params(20)),
+        ("LAlgo2", LAlgo2.Params(21)),
+        ("LAlgo3", LAlgo3.Params(22))),
+      servingParams = LServing1.Params(3))
+
+    val pd = ProcessedData(1, TrainingData(0))
+    val model20 = LAlgo2.Model(20, pd)
+    val model21 = LAlgo2.Model(21, pd)
+    val model22 = LAlgo3.Model(22, pd)
+
+    //val models = engine.train(sc, engineParams, WorkflowParams())
+    val models = engine.train(
+      sc, 
+      engineParams, 
+      engineInstanceId = "",
+      params = WorkflowParams())
+
+    val pModel20 = PersistentModelManifest(model20.getClass.getName)
+    val pModel21 = PersistentModelManifest(model21.getClass.getName)
+    
+    models should contain theSameElementsAs Seq(pModel20, pModel21, model22)
+  }
+  
+  test("Engine.train persisting P&NAlgo.Model") {
+    val engine = new Engine(
+      classOf[PDataSource2],
+      classOf[PPreparator1],
+      Map(
+        "PAlgo2" -> classOf[PAlgo2],
+        "PAlgo3" -> classOf[PAlgo3],
+        "NAlgo2" -> classOf[NAlgo2],
+        "NAlgo3" -> classOf[NAlgo3]
+      ),
+      classOf[LServing1])
+
+    val engineParams = EngineParams(
+      dataSourceParams = PDataSource2.Params(0),
+      preparatorParams = PPreparator1.Params(1),
+      algorithmParamsList = Seq(
+        ("PAlgo2", PAlgo2.Params(20)),
+        ("PAlgo3", PAlgo3.Params(21)),
+        ("PAlgo3", PAlgo3.Params(22)),
+        ("NAlgo2", NAlgo2.Params(23)),
+        ("NAlgo3", NAlgo3.Params(24)),
+        ("NAlgo3", NAlgo3.Params(25))
+      ),
+      servingParams = LServing1.Params(3))
+
+    val pd = ProcessedData(1, TrainingData(0))
+    val model21 = PAlgo3.Model(21, pd)
+    val model22 = PAlgo3.Model(22, pd)
+    val model23 = NAlgo2.Model(23, pd)
+    val model24 = NAlgo3.Model(24, pd)
+    val model25 = NAlgo3.Model(25, pd)
+
+    //val models = engine.train(sc, engineParams, WorkflowParams())
+    val models = engine.train(
+      sc, 
+      engineParams, 
+      engineInstanceId = "",
+      params = WorkflowParams())
+
+    val pModel21 = PersistentModelManifest(model21.getClass.getName)
+    val pModel22 = PersistentModelManifest(model22.getClass.getName)
+    val pModel23 = PersistentModelManifest(model23.getClass.getName)
+    
+    models should contain theSameElementsAs Seq(
+      Unit, pModel21, pModel22, pModel23, model24, model25)
+  }
+
+  test("Engine.eval") {
+    val engine = new Engine(
+      classOf[PDataSource2],
+      classOf[PPreparator1],
+      Map("" -> classOf[PAlgo2]),
+      classOf[LServing1])
+
+    val qn = 10
+    val en = 3
+
+    val engineParams = EngineParams(
+      dataSourceParams = PDataSource2.Params(id = 0, en = en, qn = qn),
+      preparatorParams = PPreparator1.Params(1),
+      algorithmParamsList = Seq(("", PAlgo2.Params(2))),
+      servingParams = LServing1.Params(3))
+
+    val algoCount = engineParams.algorithmParamsList.size
+    val pd = ProcessedData(1, TrainingData(0))
+    val model0 = PAlgo2.Model(2, pd)
+
+    val evalDataSet = engine.eval(sc, engineParams, WorkflowParams())
+
+    evalDataSet should have size en
+
+    forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => {
+      val (evalInfo, qpaRDD) = evalData
+      evalInfo shouldBe EvalInfo(0)
+
+      val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect
+
+      qpaSeq should have size qn
+
+      forAll (qpaSeq) { case (q, p, a) => 
+        val Query(qId, qEx, qQx, _) = q
+        val Actual(aId, aEx, aQx) = a
+        qId shouldBe aId
+        qEx shouldBe ex
+        aEx shouldBe ex
+        qQx shouldBe aQx
+
+        inside (p) { case Prediction(pId, pQ, pModels, pPs) => {
+          pId shouldBe 3
+          pQ shouldBe q
+          pModels shouldBe None
+          pPs should have size algoCount
+          pPs shouldBe Seq(
+            Prediction(id = 2, q = q, models = Some(model0)))
+        }}
+      }
+    }}
+  }
+
+  test("Engine.prepareDeploy PAlgo") {
+    val engine = new Engine(
+      classOf[PDataSource2],
+      classOf[PPreparator1],
+      Map(
+        "PAlgo2" -> classOf[PAlgo2],
+        "PAlgo3" -> classOf[PAlgo3],
+        "NAlgo2" -> classOf[NAlgo2],
+        "NAlgo3" -> classOf[NAlgo3]
+      ),
+      classOf[LServing1])
+
+    val engineParams = EngineParams(
+      dataSourceParams = PDataSource2.Params(0),
+      preparatorParams = PPreparator1.Params(1),
+      algorithmParamsList = Seq(
+        ("PAlgo2", PAlgo2.Params(20)),
+        ("PAlgo3", PAlgo3.Params(21)),
+        ("PAlgo3", PAlgo3.Params(22)),
+        ("NAlgo2", NAlgo2.Params(23)),
+        ("NAlgo3", NAlgo3.Params(24)),
+        ("NAlgo3", NAlgo3.Params(25))
+      ),
+      servingParams = LServing1.Params(3))
+
+    val pd = ProcessedData(1, TrainingData(0))
+    val model20 = PAlgo2.Model(20, pd)
+    val model21 = PAlgo3.Model(21, pd)
+    val model22 = PAlgo3.Model(22, pd)
+    val model23 = NAlgo2.Model(23, pd)
+    val model24 = NAlgo3.Model(24, pd)
+    val model25 = NAlgo3.Model(25, pd)
+
+    val rand = new Random()
+
+    val fakeEngineInstanceId = s"FakeInstanceId-${rand.nextLong()}"
+
+    val persistedModels = engine.train(
+      sc,
+      engineParams,
+      engineInstanceId = fakeEngineInstanceId,
+      params = WorkflowParams()
+    )
+
+    val deployableModels = engine.prepareDeploy(
+      sc,
+      engineParams,
+      fakeEngineInstanceId,
+      persistedModels,
+      params = WorkflowParams()
+    )
+
+    deployableModels should contain theSameElementsAs Seq(
+      model20, model21, model22, model23, model24, model25)
+  }
+}
+
+class EngineTrainSuite extends FunSuite with SharedSparkContext {
+  import org.apache.predictionio.controller.Engine0._
+  val defaultWorkflowParams: WorkflowParams = WorkflowParams()
+
+  test("Parallel DS/P/Algos") {
+    val models = Engine.train(
+      sc,
+      new PDataSource0(0),
+      new PPreparator0(1),
+      Seq(
+        new PAlgo0(2),
+        new PAlgo1(3),
+        new PAlgo0(4)),
+      defaultWorkflowParams
+    )
+
+    val pd = ProcessedData(1, TrainingData(0))
+
+    models should contain theSameElementsAs Seq(
+      PAlgo0.Model(2, pd), PAlgo1.Model(3, pd), PAlgo0.Model(4, pd))
+  }
+
+  test("Local DS/P/Algos") {
+    val models = Engine.train(
+      sc,
+      new LDataSource0(0),
+      new LPreparator0(1),
+      Seq(
+        new LAlgo0(2),
+        new LAlgo1(3),
+        new LAlgo0(4)),
+      defaultWorkflowParams
+    )
+    
+    val pd = ProcessedData(1, TrainingData(0))
+
+    val expectedResults = Seq(
+      LAlgo0.Model(2, pd),
+      LAlgo1.Model(3, pd),
+      LAlgo0.Model(4, pd))
+
+    forAll(models.zip(expectedResults)) { case (model, expected) => 
+      model shouldBe a [RDD[_]]
+      val localModel = model.asInstanceOf[RDD[_]].collect
+      localModel should contain theSameElementsAs Seq(expected)
+    }
+  }
+
+  test("P2L DS/P/Algos") {
+    val models = Engine.train(
+      sc,
+      new PDataSource0(0),
+      new PPreparator0(1),
+      Seq(
+        new NAlgo0(2),
+        new NAlgo1(3),
+        new NAlgo0(4)),
+      defaultWorkflowParams
+    )
+
+    val pd = ProcessedData(1, TrainingData(0))
+    
+    models should contain theSameElementsAs Seq(
+      NAlgo0.Model(2, pd), NAlgo1.Model(3, pd), NAlgo0.Model(4, pd))
+  }
+  
+  test("Parallel DS/P/Algos Stop-After-Read") {
+    val workflowParams = defaultWorkflowParams.copy(
+      stopAfterRead = true)
+
+    an [StopAfterReadInterruption] should be thrownBy Engine.train(
+      sc,
+      new PDataSource0(0),
+      new PPreparator0(1),
+      Seq(
+        new PAlgo0(2),
+        new PAlgo1(3),
+        new PAlgo0(4)),
+      workflowParams
+    )
+  }
+  
+  test("Parallel DS/P/Algos Stop-After-Prepare") {
+    val workflowParams = defaultWorkflowParams.copy(
+      stopAfterPrepare = true)
+
+    an [StopAfterPrepareInterruption] should be thrownBy Engine.train(
+      sc,
+      new PDataSource0(0),
+      new PPreparator0(1),
+      Seq(
+        new PAlgo0(2),
+        new PAlgo1(3),
+        new PAlgo0(4)),
+      workflowParams
+    )
+  }
+  
+  test("Parallel DS/P/Algos Dirty TrainingData") {
+    val workflowParams = defaultWorkflowParams.copy(
+      skipSanityCheck = false)
+
+    an [AssertionError] should be thrownBy Engine.train(
+      sc,
+      new PDataSource3(0, error = true),
+      new PPreparator0(1),
+      Seq(
+        new PAlgo0(2),
+        new PAlgo1(3),
+        new PAlgo0(4)),
+      workflowParams
+    )
+  }
+  
+  test("Parallel DS/P/Algos Dirty TrainingData But Skip Check") {
+    val workflowParams = defaultWorkflowParams.copy(
+      skipSanityCheck = true)
+
+    val models = Engine.train(
+      sc,
+      new PDataSource3(0, error = true),
+      new PPreparator0(1),
+      Seq(
+        new PAlgo0(2),
+        new PAlgo1(3),
+        new PAlgo0(4)),
+      workflowParams
+    )
+    
+  val pd = ProcessedData(1, TrainingData(0, error = true))
+
+    models should contain theSameElementsAs Seq(
+      PAlgo0.Model(2, pd), PAlgo1.Model(3, pd), PAlgo0.Model(4, pd))
+  }
+}
+
+
+class EngineEvalSuite
+extends FunSuite with Inside with SharedSparkContext {
+  import org.apache.predictionio.controller.Engine0._
+
+  @transient lazy val logger = Logger[this.type] 
+  
+  test("Simple Parallel DS/P/A/S") {
+    val en = 2
+    val qn = 5
+
+    val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = 
+    Engine.eval(
+      sc,
+      new PDataSource1(id = 1, en = en, qn = qn),
+      new PPreparator0(id = 2),
+      Seq(new PAlgo0(id = 3)),
+      new LServing0(id = 10))
+
+    val pd = ProcessedData(2, TrainingData(1))
+    val model0 = PAlgo0.Model(3, pd)
+
+    forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => {
+      val (evalInfo, qpaRDD) = evalData
+      evalInfo shouldBe EvalInfo(1)
+
+      val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect
+      forAll (qpaSeq) { case (q, p, a) => 
+        val Query(qId, qEx, qQx, _) = q
+        val Actual(aId, aEx, aQx) = a
+        qId shouldBe aId
+        qEx shouldBe ex
+        aEx shouldBe ex
+        qQx shouldBe aQx
+
+        inside (p) { case Prediction(pId, pQ, pModels, pPs) => {
+          pId shouldBe 10
+          pQ shouldBe q
+          pModels shouldBe None
+          pPs should have size 1
+          pPs shouldBe Seq(
+            Prediction(id = 3, q = q, models = Some(model0)))
+        }}
+      }
+
+    }}
+
+  }
+
+  test("Parallel DS/P/A/S") {
+    val en = 2
+    val qn = 5
+
+    val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = 
+    Engine.eval(
+      sc,
+      new PDataSource1(id = 1, en = en, qn = qn),
+      new PPreparator0(id = 2),
+      Seq(
+        new PAlgo0(id = 3), 
+        new PAlgo1(id = 4),
+        new NAlgo1(id = 5)),
+      new LServing0(id = 10))
+
+    val pd = ProcessedData(2, TrainingData(1))
+    val model0 = PAlgo0.Model(3, pd)
+    val model1 = PAlgo1.Model(4, pd)
+    val model2 = NAlgo1.Model(5, pd)
+
+    forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => {
+      val (evalInfo, qpaRDD) = evalData
+      evalInfo shouldBe EvalInfo(1)
+
+      val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect
+      forAll (qpaSeq) { case (q, p, a) => 
+        val Query(qId, qEx, qQx, _) = q
+        val Actual(aId, aEx, aQx) = a
+        qId shouldBe aId
+        qEx shouldBe ex
+        aEx shouldBe ex
+        qQx shouldBe aQx
+
+        inside (p) { case Prediction(pId, pQ, pModels, pPs) => {
+          pId shouldBe 10
+          pQ shouldBe q
+          pModels shouldBe None
+          pPs should have size 3
+          pPs shouldBe Seq(
+            Prediction(id = 3, q = q, models = Some(model0)),
+            Prediction(id = 4, q = q, models = Some(model1)),
+            Prediction(id = 5, q = q, models = Some(model2))
+          )
+        }}
+      }
+    }}
+  }
+  
+  test("Parallel DS/P/A/S with Supplemented Query") {
+    val en = 2
+    val qn = 5
+
+    val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = 
+    Engine.eval(
+      sc,
+      new PDataSource1(id = 1, en = en, qn = qn),
+      new PPreparator0(id = 2),
+      Seq(
+        new PAlgo0(id = 3), 
+        new PAlgo1(id = 4),
+        new NAlgo1(id = 5)),
+      new LServing2(id = 10))
+
+    val pd = ProcessedData(2, TrainingData(1))
+    val model0 = PAlgo0.Model(3, pd)
+    val model1 = PAlgo1.Model(4, pd)
+    val model2 = NAlgo1.Model(5, pd)
+
+    forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => {
+      val (evalInfo, qpaRDD) = evalData
+      evalInfo shouldBe EvalInfo(1)
+
+      val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect
+      forAll (qpaSeq) { case (q, p, a) => 
+        val Query(qId, qEx, qQx, qSupp) = q
+        val Actual(aId, aEx, aQx) = a
+        qId shouldBe aId
+        qEx shouldBe ex
+        aEx shouldBe ex
+        qQx shouldBe aQx
+        qSupp shouldBe false
+
+        inside (p) { case Prediction(pId, pQ, pModels, pPs) => {
+          pId shouldBe 10
+          pQ shouldBe q
+          pModels shouldBe None
+          pPs should have size 3
+          // queries inside prediction should have supp set to true, since it
+          // represents what the algorithms see.
+          val qSupp = q.copy(supp = true)
+          pPs shouldBe Seq(
+            Prediction(id = 3, q = qSupp, models = Some(model0)),
+            Prediction(id = 4, q = qSupp, models = Some(model1)),
+            Prediction(id = 5, q = qSupp, models = Some(model2))
+          )
+        }}
+      }
+    }}
+  }
+  
+  test("Local DS/P/A/S") {
+    val en = 2
+    val qn = 5
+
+    val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = 
+    Engine.eval(
+      sc,
+      new LDataSource0(id = 1, en = en, qn = qn),
+      new LPreparator0(id = 2),
+      Seq(
+        new LAlgo0(id = 3), 
+        new LAlgo1(id = 4),
+        new LAlgo1(id = 5)),
+      new LServing0(id = 10))
+
+    val pd = ProcessedData(2, TrainingData(1))
+    val model0 = LAlgo0.Model(3, pd)
+    val model1 = LAlgo1.Model(4, pd)
+    val model2 = LAlgo1.Model(5, pd)
+
+    forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => {
+      val (evalInfo, qpaRDD) = evalData
+      evalInfo shouldBe EvalInfo(1)
+
+      val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect
+      forAll (qpaSeq) { case (q, p, a) => 
+        val Query(qId, qEx, qQx, _) = q
+        val Actual(aId, aEx, aQx) = a
+        qId shouldBe aId
+        qEx shouldBe ex
+        aEx shouldBe ex
+        qQx shouldBe aQx
+
+        inside (p) { case Prediction(pId, pQ, pModels, pPs) => {
+          pId shouldBe 10
+          pQ shouldBe q
+          pModels shouldBe None
+          pPs should have size 3
+          pPs shouldBe Seq(
+            Prediction(id = 3, q = q, models = Some(model0)),
+            Prediction(id = 4, q = q, models = Some(model1)),
+            Prediction(id = 5, q = q, models = Some(model2))
+          )
+        }}
+      }
+
+    }}
+
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/controller/EvaluationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/predictionio/controller/EvaluationTest.scala 
b/core/src/test/scala/org/apache/predictionio/controller/EvaluationTest.scala
new file mode 100644
index 0000000..86fe68c
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/predictionio/controller/EvaluationTest.scala
@@ -0,0 +1,46 @@
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.workflow.SharedSparkContext
+
+import org.scalatest.FunSuite
+import org.scalatest.Inside
+import org.scalatest.Matchers._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+object EvaluationSuite {
+  import org.apache.predictionio.controller.TestEvaluator._
+
+  class Metric0 extends Metric[EvalInfo, Query, Prediction, Actual, Int] {
+    def calculate(
+      sc: SparkContext,
+      evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])]): Int = 1
+  }
+
+  object Evaluation0 extends Evaluation {
+    engineMetric = (new FakeEngine(1, 1, 1), new Metric0())
+  }
+}
+
+
+class EvaluationSuite
+extends FunSuite with Inside with SharedSparkContext {
+  import org.apache.predictionio.controller.EvaluationSuite._
+
+  test("Evaluation makes MetricEvaluator") {
+    // MetricEvaluator is typed [EvalInfo, Query, Prediction, Actual, Int],
+    // however this information is erased on JVM. scalatest doc recommends to
+    // use wildcards.
+    Evaluation0.evaluator shouldBe a [MetricEvaluator[_, _, _, _, _]]
+  }
+
+  test("Load from class path") {
+    val r = org.apache.predictionio.workflow.WorkflowUtils.getEvaluation(
+      "org.apache.predictionio.controller.EvaluationSuite.Evaluation0",
+      getClass.getClassLoader)
+
+    r._2 shouldBe EvaluationSuite.Evaluation0
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/controller/EvaluatorTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/predictionio/controller/EvaluatorTest.scala 
b/core/src/test/scala/org/apache/predictionio/controller/EvaluatorTest.scala
new file mode 100644
index 0000000..c2668ac
--- /dev/null
+++ b/core/src/test/scala/org/apache/predictionio/controller/EvaluatorTest.scala
@@ -0,0 +1,93 @@
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core._
+import org.apache.predictionio.workflow.WorkflowParams
+
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+object TestEvaluator {
+  case class EvalInfo(id: Int, ex: Int)
+  case class Query(id: Int, ex: Int, qx: Int)
+  case class Prediction(id: Int, ex: Int, qx: Int)
+  case class Actual(id: Int, ex: Int, qx: Int)
+
+  class FakeEngine(val id: Int, val en: Int, val qn: Int)
+  extends BaseEngine[EvalInfo, Query, Prediction, Actual] {
+    def train(
+      sc: SparkContext, 
+      engineParams: EngineParams,
+      instanceId: String = "",
+      params: WorkflowParams = WorkflowParams()
+    ): Seq[Any] = {
+      Seq[Any]()
+    }
+
+    def eval(
+      sc: SparkContext, 
+      engineParams: EngineParams, 
+      params: WorkflowParams)
+    : Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = {
+      (0 until en).map { ex => {
+        val qpas = (0 until qn).map { qx => {
+          (Query(id, ex, qx), Prediction(id, ex, qx), Actual(id, ex, qx))
+        }}
+  
+        (EvalInfo(id = id, ex = ex), sc.parallelize(qpas))
+      }}
+    }
+  
+  }
+
+  /*
+  class Evaluator0 extends Evaluator[EvalInfo, Query, Prediction, Actual,
+      (Query, Prediction, Actual), 
+      (EvalInfo, Seq[(Query, Prediction, Actual)]),
+      Seq[(EvalInfo, (EvalInfo, Seq[(Query, Prediction, Actual)]))]
+      ] {
+
+    def evaluateUnit(q: Query, p: Prediction, a: Actual)
+    : (Query, Prediction, Actual) = (q, p, a)
+
+    def evaluateSet(
+        evalInfo: EvalInfo, 
+        eus: Seq[(Query, Prediction, Actual)])
+    : (EvalInfo, Seq[(Query, Prediction, Actual)]) = (evalInfo, eus)
+
+    def evaluateAll(
+      input: Seq[(EvalInfo, (EvalInfo, Seq[(Query, Prediction, Actual)]))]) 
+    = input
+  }
+  */
+
+}
+
+/*
+class EvaluatorSuite
+extends FunSuite with Inside with SharedSparkContext {
+  import org.apache.predictionio.controller.TestEvaluator._
+  @transient lazy val logger = Logger[this.type] 
+
+  test("Evaluator.evaluate") {
+    val engine = new FakeEngine(1, 3, 10)
+    val evaluator = new Evaluator0()
+  
+    val evalDataSet = engine.eval(sc, null.asInstanceOf[EngineParams])
+    val er: Seq[(EvalInfo, (EvalInfo, Seq[(Query, Prediction, Actual)]))] =
+      evaluator.evaluateBase(sc, evalDataSet)
+
+    evalDataSet.zip(er).map { case (input, output) => {
+      val (inputEvalInfo, inputQpaRDD) = input
+      val (outputEvalInfo, (outputEvalInfo2, outputQpaSeq)) = output
+      
+      inputEvalInfo shouldBe outputEvalInfo
+      inputEvalInfo shouldBe outputEvalInfo2
+      
+      val inputQpaSeq: Array[(Query, Prediction, Actual)] = inputQpaRDD.collect
+
+      inputQpaSeq.size should be (outputQpaSeq.size)
+      // TODO. match inputQpa and outputQpa content.
+    }}
+  }
+}
+*/

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/controller/FastEvalEngineTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/predictionio/controller/FastEvalEngineTest.scala
 
b/core/src/test/scala/org/apache/predictionio/controller/FastEvalEngineTest.scala
new file mode 100644
index 0000000..a4dc42f
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/predictionio/controller/FastEvalEngineTest.scala
@@ -0,0 +1,181 @@
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.workflow.WorkflowParams
+import org.scalatest.FunSuite
+import org.scalatest.Inside
+import org.scalatest.Matchers._
+import org.scalatest.Inspectors._
+
+import org.apache.predictionio.workflow.SharedSparkContext
+
+class FastEngineSuite
+extends FunSuite with Inside with SharedSparkContext {
+  import org.apache.predictionio.controller.Engine0._
+  
+  test("Single Evaluation") {
+    val engine = new FastEvalEngine(
+      Map("" -> classOf[PDataSource2]),
+      Map("" -> classOf[PPreparator1]),
+      Map(
+        "PAlgo2" -> classOf[PAlgo2],
+        "PAlgo3" -> classOf[PAlgo3]
+      ),
+      Map("" -> classOf[LServing1]))
+
+    val qn = 10
+    val en = 3
+
+    val engineParams = EngineParams(
+      dataSourceParams = PDataSource2.Params(id = 0, en = en, qn = qn),
+      preparatorParams = PPreparator1.Params(1),
+      algorithmParamsList = Seq(
+        ("PAlgo2", PAlgo2.Params(20)),
+        ("PAlgo2", PAlgo2.Params(21)),
+        ("PAlgo3", PAlgo3.Params(22))
+      ),
+      servingParams = LServing1.Params(3))
+
+    val algoCount = engineParams.algorithmParamsList.size
+    val pd = ProcessedData(1, TrainingData(0))
+    val model0 = PAlgo2.Model(20, pd)
+    val model1 = PAlgo2.Model(21, pd)
+    val model2 = PAlgo3.Model(22, pd)
+
+    val evalDataSet = engine.eval(sc, engineParams, WorkflowParams())
+
+    evalDataSet should have size en
+
+    forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => {
+      val (evalInfo, qpaRDD) = evalData
+      evalInfo shouldBe EvalInfo(0)
+
+      val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect
+
+      qpaSeq should have size qn
+
+      forAll (qpaSeq) { case (q, p, a) => 
+        val Query(qId, qEx, qQx, _) = q
+        val Actual(aId, aEx, aQx) = a
+        qId shouldBe aId
+        qEx shouldBe ex
+        aEx shouldBe ex
+        qQx shouldBe aQx
+
+        inside (p) { case Prediction(pId, pQ, pModels, pPs) => {
+          pId shouldBe 3
+          pQ shouldBe q
+          pModels shouldBe None
+          pPs should have size algoCount
+          pPs shouldBe Seq(
+            Prediction(id = 20, q = q, models = Some(model0)),
+            Prediction(id = 21, q = q, models = Some(model1)),
+            Prediction(id = 22, q = q, models = Some(model2))
+          )
+        }}
+      }
+    }}
+  }
+
+  test("Batch Evaluation") {
+    val engine = new FastEvalEngine(
+      Map("" -> classOf[PDataSource2]),
+      Map("" -> classOf[PPreparator1]),
+      Map("" -> classOf[PAlgo2]),
+      Map("" -> classOf[LServing1]))
+
+    val qn = 10
+    val en = 3
+
+    val baseEngineParams = EngineParams(
+      dataSourceParams = PDataSource2.Params(id = 0, en = en, qn = qn),
+      preparatorParams = PPreparator1.Params(1),
+      algorithmParamsList = Seq(("", PAlgo2.Params(2))),
+      servingParams = LServing1.Params(3))
+
+    val ep0 = baseEngineParams
+    val ep1 = baseEngineParams.copy(
+      algorithmParamsList = Seq(("", PAlgo2.Params(2))))
+    val ep2 = baseEngineParams.copy(
+      algorithmParamsList = Seq(("", PAlgo2.Params(20))))
+
+    val engineEvalDataSet = engine.batchEval(
+      sc,
+      Seq(ep0, ep1, ep2),
+      WorkflowParams())
+
+    val evalDataSet0 = engineEvalDataSet(0)._2
+    val evalDataSet1 = engineEvalDataSet(1)._2
+    val evalDataSet2 = engineEvalDataSet(2)._2
+
+    evalDataSet0 shouldBe evalDataSet1
+    evalDataSet0 should not be evalDataSet2
+    evalDataSet1 should not be evalDataSet2
+
+    // evalDataSet0._1 should be theSameInstanceAs evalDataSet1._1
+    // When things are cached correctly, evalDataSet0 and 1 should share the
+    // same EI
+    evalDataSet0.zip(evalDataSet1).foreach { case (e0, e1) => {
+      e0._1 should be theSameInstanceAs e1._1
+      e0._2 should be theSameInstanceAs e1._2
+    }}
+   
+    // So as set1 and set2, however, the QPA-RDD should be different.
+    evalDataSet1.zip(evalDataSet2).foreach { case (e1, e2) => {
+      e1._1 should be theSameInstanceAs e2._1
+      val e1Qpa = e1._2
+      val e2Qpa = e2._2
+      e1Qpa should not be theSameInstanceAs (e2Qpa)
+    }}
+  }
+  
+  test("Not cached when isEqual not implemented") {
+    // PDataSource3.Params is a class not case class. Need to implement the
+    // isEqual function for hashing.
+    val engine = new FastEvalEngine(
+      Map("" -> classOf[PDataSource4]),
+      Map("" -> classOf[PPreparator1]),
+      Map("" -> classOf[PAlgo2]),
+      Map("" -> classOf[LServing1]))
+
+    val qn = 10
+    val en = 3
+
+    val baseEngineParams = EngineParams(
+      dataSourceParams = new PDataSource4.Params(id = 0, en = en, qn = qn),
+      preparatorParams = PPreparator1.Params(1),
+      algorithmParamsList = Seq(("", PAlgo2.Params(2))),
+      servingParams = LServing1.Params(3))
+
+    val ep0 = baseEngineParams
+    val ep1 = baseEngineParams.copy(
+      algorithmParamsList = Seq(("", PAlgo2.Params(3))))
+    // ep2.dataSource is different from ep0.
+    val ep2 = baseEngineParams.copy(
+      dataSourceParams = ("", new PDataSource4.Params(id = 0, en = en, qn = 
qn)),
+      algorithmParamsList = Seq(("", PAlgo2.Params(3))))
+
+    val engineEvalDataSet = engine.batchEval(
+      sc,
+      Seq(ep0, ep1, ep2),
+      WorkflowParams())
+
+    val evalDataSet0 = engineEvalDataSet(0)._2
+    val evalDataSet1 = engineEvalDataSet(1)._2
+    val evalDataSet2 = engineEvalDataSet(2)._2
+
+    evalDataSet0 should not be evalDataSet1
+    evalDataSet0 should not be evalDataSet2
+    evalDataSet1 should not be evalDataSet2
+
+    // Set0 should have same EI as Set1, since their dsp are the same instance.
+    evalDataSet0.zip(evalDataSet1).foreach { case (e0, e1) => {
+      e0._1 should be theSameInstanceAs (e1._1)
+    }}
+  
+    // Set1 should have different EI as Set2, since Set2's dsp is another
+    // instance
+    evalDataSet1.zip(evalDataSet2).foreach { case (e1, e2) => {
+      e1._1 should not be theSameInstanceAs (e2._1)
+    }}
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/controller/MetricEvaluatorTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/predictionio/controller/MetricEvaluatorTest.scala
 
b/core/src/test/scala/org/apache/predictionio/controller/MetricEvaluatorTest.scala
new file mode 100644
index 0000000..a7e397a
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/predictionio/controller/MetricEvaluatorTest.scala
@@ -0,0 +1,52 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.workflow.SharedSparkContext
+import org.apache.predictionio.workflow.WorkflowParams
+import org.scalatest.FunSuite
+
+object MetricEvaluatorSuite {
+  case class Metric0() extends SumMetric[EmptyParams, Int, Int, Int, Int] {
+    def calculate(q: Int, p: Int, a: Int): Int = q
+  }
+
+  object Evaluation0 extends Evaluation {}
+}
+
+class MetricEvaluatorDevSuite extends FunSuite with SharedSparkContext {
+  import org.apache.predictionio.controller.MetricEvaluatorSuite._
+
+  test("a") {
+    val metricEvaluator = MetricEvaluator(
+      Metric0(),
+      Seq(Metric0(), Metric0())
+    )
+ 
+    val engineEvalDataSet = Seq(
+      (EngineParams(), Seq(
+        (EmptyParams(), sc.parallelize(Seq((1,0,0), (2,0,0)))))),
+      (EngineParams(), Seq(
+        (EmptyParams(), sc.parallelize(Seq((1,0,0), (2,0,0)))))))
+
+    val r = metricEvaluator.evaluateBase(
+      sc,
+      Evaluation0,
+      engineEvalDataSet,
+      WorkflowParams())
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/controller/MetricTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/predictionio/controller/MetricTest.scala 
b/core/src/test/scala/org/apache/predictionio/controller/MetricTest.scala
new file mode 100644
index 0000000..67975b1
--- /dev/null
+++ b/core/src/test/scala/org/apache/predictionio/controller/MetricTest.scala
@@ -0,0 +1,143 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.workflow.SharedSparkContext
+
+import grizzled.slf4j.Logger
+import org.scalatest.Matchers._
+import org.scalatest.FunSuite
+import org.scalatest.Inside
+
+object MetricDevSuite {
+  class QIntSumMetric extends SumMetric[EmptyParams, Int, Int, Int, Int] {
+    def calculate(q: Int, p: Int, a: Int): Int = q
+  }
+  
+  class QDoubleSumMetric extends SumMetric[EmptyParams, Int, Int, Int, Double] 
{
+    def calculate(q: Int, p: Int, a: Int): Double = q.toDouble
+  }
+  
+  class QAverageMetric extends AverageMetric[EmptyParams, Int, Int, Int] {
+    def calculate(q: Int, p: Int, a: Int): Double = q.toDouble
+  }
+  
+  class QOptionAverageMetric extends OptionAverageMetric[EmptyParams, Int, 
Int, Int] {
+    def calculate(q: Int, p: Int, a: Int): Option[Double] = {
+      if (q < 0) { None } else { Some(q.toDouble) }
+    }
+  }
+  
+  class QStdevMetric extends StdevMetric[EmptyParams, Int, Int, Int] {
+    def calculate(q: Int, p: Int, a: Int): Double = q.toDouble
+  }
+  
+  class QOptionStdevMetric extends OptionStdevMetric[EmptyParams, Int, Int, 
Int] {
+    def calculate(q: Int, p: Int, a: Int): Option[Double] = {
+      if (q < 0) { None } else { Some(q.toDouble) }
+    }
+  }
+  
+}
+
+class MetricDevSuite
+extends FunSuite with Inside with SharedSparkContext {
+  @transient lazy val logger = Logger[this.type] 
+  
+  test("Average Metric") {
+    val qpaSeq0 = Seq((1, 0, 0), (2, 0, 0), (3, 0, 0))
+    val qpaSeq1 = Seq((4, 0, 0), (5, 0, 0), (6, 0, 0))
+
+    val evalDataSet = Seq(
+      (EmptyParams(), sc.parallelize(qpaSeq0)),
+      (EmptyParams(), sc.parallelize(qpaSeq1)))
+  
+    val m = new MetricDevSuite.QAverageMetric()
+    val result = m.calculate(sc, evalDataSet)
+    
+    result shouldBe (21.0 / 6)
+  }
+  
+  test("Option Average Metric") {
+    val qpaSeq0 = Seq((1, 0, 0), (2, 0, 0), (3, 0, 0))
+    val qpaSeq1 = Seq((-4, 0, 0), (-5, 0, 0), (6, 0, 0))
+
+    val evalDataSet = Seq(
+      (EmptyParams(), sc.parallelize(qpaSeq0)),
+      (EmptyParams(), sc.parallelize(qpaSeq1)))
+  
+    val m = new MetricDevSuite.QOptionAverageMetric()
+    val result = m.calculate(sc, evalDataSet)
+    
+    result shouldBe (12.0 / 4)
+  }
+  
+  test("Stdev Metric") {
+    val qpaSeq0 = Seq((1, 0, 0), (1, 0, 0), (1, 0, 0), (1, 0, 0))
+    val qpaSeq1 = Seq((5, 0, 0), (5, 0, 0), (5, 0, 0), (5, 0, 0))
+
+    val evalDataSet = Seq(
+      (EmptyParams(), sc.parallelize(qpaSeq0)),
+      (EmptyParams(), sc.parallelize(qpaSeq1)))
+  
+    val m = new MetricDevSuite.QStdevMetric()
+    val result = m.calculate(sc, evalDataSet)
+    
+    result shouldBe 2.0
+  }
+  
+  test("Option Stdev Metric") {
+    val qpaSeq0 = Seq((1, 0, 0), (1, 0, 0), (1, 0, 0), (1, 0, 0))
+    val qpaSeq1 = Seq((5, 0, 0), (5, 0, 0), (5, 0, 0), (5, 0, 0), (-5, 0, 0))
+
+    val evalDataSet = Seq(
+      (EmptyParams(), sc.parallelize(qpaSeq0)),
+      (EmptyParams(), sc.parallelize(qpaSeq1)))
+  
+    val m = new MetricDevSuite.QOptionStdevMetric()
+    val result = m.calculate(sc, evalDataSet)
+    
+    result shouldBe 2.0
+  }
+
+  test("Sum Metric [Int]") {
+    val qpaSeq0 = Seq((1, 0, 0), (2, 0, 0), (3, 0, 0))
+    val qpaSeq1 = Seq((4, 0, 0), (5, 0, 0), (6, 0, 0))
+
+    val evalDataSet = Seq(
+      (EmptyParams(), sc.parallelize(qpaSeq0)),
+      (EmptyParams(), sc.parallelize(qpaSeq1)))
+  
+    val m = new MetricDevSuite.QIntSumMetric()
+    val result = m.calculate(sc, evalDataSet)
+    
+    result shouldBe 21
+  }
+
+  test("Sum Metric [Double]") {
+    val qpaSeq0 = Seq((1, 0, 0), (2, 0, 0), (3, 0, 0))
+    val qpaSeq1 = Seq((4, 0, 0), (5, 0, 0), (6, 0, 0))
+
+    val evalDataSet = Seq(
+      (EmptyParams(), sc.parallelize(qpaSeq0)),
+      (EmptyParams(), sc.parallelize(qpaSeq1)))
+  
+    val m = new MetricDevSuite.QDoubleSumMetric()
+    val result = m.calculate(sc, evalDataSet)
+    
+    result shouldBe 21.0
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/controller/SampleEngine.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/predictionio/controller/SampleEngine.scala 
b/core/src/test/scala/org/apache/predictionio/controller/SampleEngine.scala
new file mode 100644
index 0000000..e238e86
--- /dev/null
+++ b/core/src/test/scala/org/apache/predictionio/controller/SampleEngine.scala
@@ -0,0 +1,472 @@
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.controller.{Params => PIOParams}
+import org.apache.predictionio.core._
+
+import grizzled.slf4j.Logger
+import org.apache.predictionio.workflow.WorkflowParams
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+object Engine0 {
+  @transient lazy val logger = Logger[this.type] 
+
+  case class TrainingData(id: Int, error: Boolean = false) extends SanityCheck 
{
+    def sanityCheck(): Unit = {
+      Predef.assert(!error, "Not Error")
+    }
+  }
+
+  case class EvalInfo(id: Int)
+  case class ProcessedData(id: Int, td: TrainingData)
+
+  case class Query(id: Int, ex: Int = 0, qx: Int = 0, supp: Boolean = false)
+  case class Actual(id: Int, ex: Int = 0, qx: Int = 0)
+  case class Prediction(
+    id: Int, q: Query, models: Option[Any] = None, 
+    ps: Seq[Prediction] = Seq[Prediction]())
+
+  class PDataSource0(id: Int = 0) 
+  extends PDataSource[TrainingData, EvalInfo, Query, Actual] {
+    def readTraining(sc: SparkContext): TrainingData = {
+      TrainingData(id)
+    }
+  }
+  
+  class PDataSource1(id: Int = 0, en: Int = 0, qn: Int = 0)
+  extends PDataSource[TrainingData, EvalInfo, Query, Actual] {
+    def readTraining(sc: SparkContext): TrainingData = TrainingData(id)
+    
+    override
+    def readEval(sc: SparkContext)
+    : Seq[(TrainingData, EvalInfo, RDD[(Query, Actual)])] = {
+      (0 until en).map { ex => {
+        val qaSeq: Seq[(Query, Actual)] = (0 until qn).map { qx => {
+          (Query(id, ex=ex, qx=qx), Actual(id, ex, qx))
+        }}
+        (TrainingData(id), EvalInfo(id), sc.parallelize(qaSeq))
+      }}
+    }
+  }
+
+  object PDataSource2 {
+    case class Params(id: Int, en: Int = 0, qn: Int = 0) extends PIOParams
+  }
+  
+  class PDataSource2(params: PDataSource2.Params)
+  extends PDataSource[TrainingData, EvalInfo, Query, Actual] {
+    val id = params.id
+    def readTraining(sc: SparkContext): TrainingData = TrainingData(id)
+    
+    override
+    def readEval(sc: SparkContext)
+    : Seq[(TrainingData, EvalInfo, RDD[(Query, Actual)])] = {
+      (0 until params.en).map { ex => {
+        val qaSeq: Seq[(Query, Actual)] = (0 until params.qn).map { qx => {
+          (Query(id, ex=ex, qx=qx), Actual(id, ex, qx))
+        }}
+        (TrainingData(id), EvalInfo(id), sc.parallelize(qaSeq))
+      }}
+    }
+  }
+  
+  class PDataSource3(id: Int = 0, error: Boolean = false) 
+  extends PDataSource[TrainingData, EvalInfo, Query, Actual] {
+    def readTraining(sc: SparkContext): TrainingData = {
+      TrainingData(id = id, error = error)
+    }
+  }
+  
+  object PDataSource4 {
+    class Params(val id: Int, val en: Int = 0, val qn: Int = 0) 
+      extends PIOParams
+  }
+  
+  class PDataSource4(params: PDataSource4.Params)
+  extends PDataSource[TrainingData, EvalInfo, Query, Actual] {
+    val id = params.id
+    def readTraining(sc: SparkContext): TrainingData = TrainingData(id)
+    
+    override
+    def readEval(sc: SparkContext)
+    : Seq[(TrainingData, EvalInfo, RDD[(Query, Actual)])] = {
+      (0 until params.en).map { ex => {
+        val qaSeq: Seq[(Query, Actual)] = (0 until params.qn).map { qx => {
+          (Query(id, ex=ex, qx=qx), Actual(id, ex, qx))
+        }}
+        (TrainingData(id), EvalInfo(id), sc.parallelize(qaSeq))
+      }}
+    }
+  }
+  
+  class LDataSource0(id: Int, en: Int = 0, qn: Int = 0) 
+    extends LDataSource[TrainingData, EvalInfo, Query, Actual] {
+    def readTraining(): TrainingData = TrainingData(id)
+   
+    override
+    def readEval()
+    : Seq[(TrainingData, EvalInfo, Seq[(Query, Actual)])] = {
+      (0 until en).map { ex => {
+        val qaSeq: Seq[(Query, Actual)] = (0 until qn).map { qx => {
+          (Query(id, ex=ex, qx=qx), Actual(id, ex, qx))
+        }}
+        (TrainingData(id), EvalInfo(id), qaSeq)
+      }}
+    }
+  }
+  
+  object LDataSource1 {
+    case class Params(id: Int, en: Int = 0, qn: Int = 0) extends PIOParams
+  }
+  
+  class LDataSource1(params: LDataSource1.Params)
+  extends LDataSource[TrainingData, EvalInfo, Query, Actual] {
+    val id = params.id
+    def readTraining(): TrainingData = TrainingData(id)
+    
+    override
+    def readEval(): Seq[(TrainingData, EvalInfo, Seq[(Query, Actual)])] = {
+      (0 until params.en).map { ex => {
+        val qaSeq: Seq[(Query, Actual)] = (0 until params.qn).map { qx => {
+          (Query(id, ex=ex, qx=qx), Actual(id, ex, qx))
+        }}
+        (TrainingData(id), EvalInfo(id), qaSeq)
+      }}
+    }
+  }
+  
+  class PPreparator0(id: Int = 0)
+  extends PPreparator[TrainingData, ProcessedData] {
+    def prepare(sc: SparkContext, td: TrainingData): ProcessedData = {
+      ProcessedData(id, td)
+    }
+  }
+
+  object PPreparator1 {
+    case class Params(id: Int  = 0) extends PIOParams
+  }
+
+  class PPreparator1(params: PPreparator1.Params)
+  extends PPreparator[TrainingData, ProcessedData] {
+    def prepare(sc: SparkContext, td: TrainingData): ProcessedData = {
+      ProcessedData(params.id, td)
+    }
+  }
+
+  class LPreparator0(id: Int = 0) 
+  extends LPreparator[TrainingData, ProcessedData] {
+    def prepare(td: TrainingData): ProcessedData = {
+      ProcessedData(id, td)
+    }
+  }
+  
+  object LPreparator1 {
+    case class Params(id: Int  = 0) extends PIOParams
+  }
+
+  class LPreparator1(params: LPreparator1.Params)
+  extends LPreparator[TrainingData, ProcessedData] {
+    def prepare(td: TrainingData): ProcessedData = {
+      ProcessedData(params.id, td)
+    }
+  }
+
+  object PAlgo0 {
+    case class Model(id: Int, pd: ProcessedData)
+  }
+
+  class PAlgo0(id: Int = 0)
+  extends PAlgorithm[ProcessedData, PAlgo0.Model, Query, Prediction] {
+    def train(sc: SparkContext, pd: ProcessedData)
+    : PAlgo0.Model = PAlgo0.Model(id, pd)
+
+    override
+    def batchPredict(m: PAlgo0.Model, qs: RDD[(Long, Query)])
+    : RDD[(Long, Prediction)] = {
+      qs.mapValues(q => Prediction(id, q, Some(m)))
+    }
+    
+    def predict(m: PAlgo0.Model, q: Query): Prediction = {
+      Prediction(id, q, Some(m))
+    }
+  }
+
+  object PAlgo1 {
+    case class Model(id: Int, pd: ProcessedData)
+  }
+
+  class PAlgo1(id: Int = 0)
+  extends PAlgorithm[ProcessedData, PAlgo1.Model, Query, Prediction] {
+    def train(sc: SparkContext, pd: ProcessedData)
+    : PAlgo1.Model = PAlgo1.Model(id, pd)
+
+    override
+    def batchPredict(m: PAlgo1.Model, qs: RDD[(Long, Query)])
+    : RDD[(Long, Prediction)] = {
+      qs.mapValues(q => Prediction(id, q, Some(m)))
+    }
+
+    def predict(m: PAlgo1.Model, q: Query): Prediction = {
+      Prediction(id, q, Some(m))
+    }
+  }
+  
+  object PAlgo2 {
+    case class Model(id: Int, pd: ProcessedData)
+    case class Params(id: Int) extends PIOParams
+  }
+
+  class PAlgo2(params: PAlgo2.Params)
+  extends PAlgorithm[ProcessedData, PAlgo2.Model, Query, Prediction] {
+    val id = params.id
+
+    def train(sc: SparkContext, pd: ProcessedData)
+    : PAlgo2.Model = PAlgo2.Model(id, pd)
+
+    override
+    def batchPredict(m: PAlgo2.Model, qs: RDD[(Long, Query)])
+    : RDD[(Long, Prediction)] = {
+      qs.mapValues(q => Prediction(id, q, Some(m)))
+    }
+
+    def predict(m: PAlgo2.Model, q: Query): Prediction = {
+      Prediction(id, q, Some(m))
+    }
+  }
+  
+  object PAlgo3 {
+    case class Model(id: Int, pd: ProcessedData)
+    extends LocalFileSystemPersistentModel[Params]
+    
+    object Model extends LocalFileSystemPersistentModelLoader[Params, Model]
+
+    case class Params(id: Int) extends PIOParams
+  }
+
+  class PAlgo3(params: PAlgo3.Params)
+  extends PAlgorithm[ProcessedData, PAlgo3.Model, Query, Prediction] {
+    val id = params.id
+
+    def train(sc: SparkContext, pd: ProcessedData)
+    : PAlgo3.Model = PAlgo3.Model(id, pd)
+
+    override
+    def batchPredict(m: PAlgo3.Model, qs: RDD[(Long, Query)])
+    : RDD[(Long, Prediction)] = {
+      qs.mapValues(q => Prediction(id, q, Some(m)))
+    }
+
+    def predict(m: PAlgo3.Model, q: Query): Prediction = {
+      Prediction(id, q, Some(m))
+    }
+  }
+  
+  object LAlgo0 {
+    case class Model(id: Int, pd: ProcessedData)
+  }
+
+  class LAlgo0(id: Int = 0) 
+  extends LAlgorithm[ProcessedData, LAlgo0.Model, Query, Prediction] {
+    def train(pd: ProcessedData): LAlgo0.Model = LAlgo0.Model(id, pd)
+
+    def predict(m: LAlgo0.Model, q: Query): Prediction = {
+      Prediction(id, q, Some(m))
+    }
+  }
+  
+  object LAlgo1 {
+    case class Model(id: Int, pd: ProcessedData)
+  }
+
+  class LAlgo1(id: Int = 0) 
+  extends LAlgorithm[ProcessedData, LAlgo1.Model, Query, Prediction] {
+    def train(pd: ProcessedData): LAlgo1.Model = LAlgo1.Model(id, pd)
+    
+    def predict(m: LAlgo1.Model, q: Query): Prediction = {
+      Prediction(id, q, Some(m))
+    }
+  }
+  
+  object LAlgo2 {
+    case class Params(id: Int) extends PIOParams
+
+    case class Model(id: Int, pd: ProcessedData)
+    extends LocalFileSystemPersistentModel[EmptyParams]
+    
+    object Model extends LocalFileSystemPersistentModelLoader[EmptyParams, 
Model]
+  }
+
+  class LAlgo2(params: LAlgo2.Params) 
+  extends LAlgorithm[ProcessedData, LAlgo2.Model, Query, Prediction] {
+    def train(pd: ProcessedData): LAlgo2.Model = LAlgo2.Model(params.id, pd)
+    
+    def predict(m: LAlgo2.Model, q: Query): Prediction = {
+      Prediction(params.id, q, Some(m))
+    }
+  }
+
+  object LAlgo3 {
+    case class Params(id: Int) extends PIOParams
+
+    case class Model(id: Int, pd: ProcessedData)
+  }
+
+  class LAlgo3(params: LAlgo3.Params) 
+  extends LAlgorithm[ProcessedData, LAlgo3.Model, Query, Prediction] {
+    def train(pd: ProcessedData): LAlgo3.Model = LAlgo3.Model(params.id, pd)
+    
+    def predict(m: LAlgo3.Model, q: Query): Prediction = {
+      Prediction(params.id, q, Some(m))
+    }
+  }
+
+  // N : P2L. As N is in the middle of P and L.
+  object NAlgo0 {
+    case class Model(id: Int, pd: ProcessedData)
+  }
+
+  class NAlgo0 (id: Int = 0)
+  extends P2LAlgorithm[ProcessedData, NAlgo0.Model, Query, Prediction] {
+    def train(sc: SparkContext, pd: ProcessedData)
+    : NAlgo0.Model = NAlgo0.Model(id, pd)
+  
+    def predict(m: NAlgo0.Model, q: Query): Prediction = {
+      Prediction(id, q, Some(m))
+    }
+  }
+
+  object NAlgo1 {
+    case class Model(id: Int, pd: ProcessedData)
+  }
+
+  class NAlgo1 (id: Int = 0)
+  extends P2LAlgorithm[ProcessedData, NAlgo1.Model, Query, Prediction] {
+    def train(sc: SparkContext, pd: ProcessedData)
+    : NAlgo1.Model = NAlgo1.Model(id, pd)
+   
+    def predict(m: NAlgo1.Model, q: Query): Prediction = {
+      Prediction(id, q, Some(m))
+    }
+  }
+  
+  object NAlgo2 {
+    case class Params(id: Int) extends PIOParams
+
+    case class Model(id: Int, pd: ProcessedData)
+    extends LocalFileSystemPersistentModel[EmptyParams]
+    
+    object Model extends LocalFileSystemPersistentModelLoader[EmptyParams, 
Model]
+  }
+
+  class NAlgo2(params: NAlgo2.Params) 
+  extends P2LAlgorithm[ProcessedData, NAlgo2.Model, Query, Prediction] {
+    def train(sc: SparkContext, pd: ProcessedData)
+    : NAlgo2.Model = NAlgo2.Model(params.id, pd)
+    
+    def predict(m: NAlgo2.Model, q: Query): Prediction = {
+      Prediction(params.id, q, Some(m))
+    }
+  }
+
+  object NAlgo3 {
+    case class Params(id: Int) extends PIOParams
+
+    case class Model(id: Int, pd: ProcessedData)
+  }
+
+  class NAlgo3(params: NAlgo3.Params) 
+  extends P2LAlgorithm[ProcessedData, NAlgo3.Model, Query, Prediction] {
+    def train(sc: SparkContext, pd: ProcessedData)
+    : NAlgo3.Model = NAlgo3.Model(params.id, pd)
+    
+    def predict(m: NAlgo3.Model, q: Query): Prediction = {
+      Prediction(params.id, q, Some(m))
+    }
+  }
+
+  class LServing0(id: Int = 0) extends LServing[Query, Prediction] {
+    def serve(q: Query, ps: Seq[Prediction]): Prediction = {
+      Prediction(id, q, ps=ps)
+    }
+  }
+
+  object LServing1 {
+    case class Params(id: Int) extends PIOParams
+  }
+  
+  class LServing1(params: LServing1.Params) extends LServing[Query, 
Prediction] {
+    def serve(q: Query, ps: Seq[Prediction]): Prediction = {
+      Prediction(params.id, q, ps=ps)
+    }
+  }
+  
+  class LServing2(id: Int) extends LServing[Query, Prediction] {
+    override
+    def supplement(q: Query): Query = q.copy(supp = true)
+
+    def serve(q: Query, ps: Seq[Prediction]): Prediction = {
+      Prediction(id, q, ps=ps)
+    }
+  }
+}
+
+object Engine1 {
+  case class EvalInfo(v: Double) extends Serializable
+  case class Query() extends Serializable
+  case class Prediction() extends Serializable
+  case class Actual() extends Serializable
+  case class DSP(v: Double) extends Params
+}
+
+class Engine1 
+extends BaseEngine[
+  Engine1.EvalInfo, Engine1.Query, Engine1.Prediction, Engine1.Actual] {
+
+  def train(
+    sc: SparkContext, 
+    engineParams: EngineParams,
+    engineInstanceId: String = "",
+    params: WorkflowParams = WorkflowParams()): Seq[Any] = Seq[Any]()
+
+  def eval(sc: SparkContext, engineParams: EngineParams, params: 
WorkflowParams)
+  : Seq[(Engine1.EvalInfo, 
+      RDD[(Engine1.Query, Engine1.Prediction, Engine1.Actual)])] = {
+    val dsp = engineParams.dataSourceParams._2.asInstanceOf[Engine1.DSP]
+    Seq(
+      (Engine1.EvalInfo(dsp.v),
+        sc.emptyRDD[(Engine1.Query, Engine1.Prediction, Engine1.Actual)]))
+  }
+}
+
+
+class Metric0
+extends Metric[Engine1.EvalInfo, Engine1.Query, Engine1.Prediction,
+Engine1.Actual, Double] {
+  override def header: String = "Metric0"
+
+  def calculate(
+    sc: SparkContext, 
+    evalDataSet: Seq[(Engine1.EvalInfo, RDD[(Engine1.Query, Engine1.Prediction,
+    Engine1.Actual)])]): Double = {
+    evalDataSet.head._1.v
+  }
+}
+
+object Metric1 {
+  case class Result(c: Int, v: Double) extends Serializable
+}
+
+class Metric1
+extends Metric[Engine1.EvalInfo, Engine1.Query, Engine1.Prediction,
+Engine1.Actual, Metric1.Result]()(Ordering.by[Metric1.Result, Double](_.v)) {
+  override def header: String = "Metric1"
+
+  def calculate(
+    sc: SparkContext, 
+    evalDataSet: Seq[(Engine1.EvalInfo, RDD[(Engine1.Query, Engine1.Prediction,
+    Engine1.Actual)])]): Metric1.Result = {
+    Metric1.Result(0, evalDataSet.head._1.v)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/workflow/BaseTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/predictionio/workflow/BaseTest.scala 
b/core/src/test/scala/org/apache/predictionio/workflow/BaseTest.scala
new file mode 100644
index 0000000..df36620
--- /dev/null
+++ b/core/src/test/scala/org/apache/predictionio/workflow/BaseTest.scala
@@ -0,0 +1,75 @@
+//package org.apache.spark
+package org.apache.predictionio.workflow
+
+import _root_.io.netty.util.internal.logging.{Slf4JLoggerFactory, 
InternalLoggerFactory}
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Suite
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkConf
+
+
+/** Manages a local `sc` {@link SparkContext} variable, correctly stopping it
+  * after each test. */
+trait LocalSparkContext 
+extends BeforeAndAfterEach with BeforeAndAfterAll { self: Suite =>
+
+  @transient var sc: SparkContext = _
+
+  override def beforeAll() {
+    InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory())
+    super.beforeAll()
+  }
+
+  override def afterEach() {
+    resetSparkContext()
+    super.afterEach()
+  }
+
+  def resetSparkContext() = {
+    LocalSparkContext.stop(sc)
+    sc = null
+  }
+
+}
+
+object LocalSparkContext {
+  def stop(sc: SparkContext) {
+    if (sc != null) {
+      sc.stop()
+    }
+    // To avoid Akka rebinding to the same port, since it doesn't unbind 
immediately on shutdown
+    System.clearProperty("spark.driver.port")
+  }
+
+  /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */
+  def withSpark[T](sc: SparkContext)(f: SparkContext => T) = {
+    try {
+      f(sc)
+    } finally {
+      stop(sc)
+    }
+  }
+
+}
+/** Shares a local `SparkContext` between all tests in a suite and closes it 
at the end */
+trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
+
+  @transient private var _sc: SparkContext = _
+
+  def sc: SparkContext = _sc
+
+  var conf = new SparkConf(false)
+
+  override def beforeAll() {
+    _sc = new SparkContext("local[4]", "test", conf)
+    super.beforeAll()
+  }
+
+  override def afterAll() {
+    LocalSparkContext.stop(_sc)
+    _sc = null
+    super.afterAll()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/org/apache/predictionio/workflow/EngineWorkflowTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/predictionio/workflow/EngineWorkflowTest.scala 
b/core/src/test/scala/org/apache/predictionio/workflow/EngineWorkflowTest.scala
new file mode 100644
index 0000000..e69de29


Reply via email to