http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/EngineParams.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/controller/EngineParams.scala 
b/core/src/main/scala/io/prediction/controller/EngineParams.scala
deleted file mode 100644
index 32f5de7..0000000
--- a/core/src/main/scala/io/prediction/controller/EngineParams.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import io.prediction.core.BaseDataSource
-import io.prediction.core.BaseAlgorithm
-
-import scala.collection.JavaConversions
-import scala.language.implicitConversions
-
-/** This class serves as a logical grouping of all required engine's 
parameters.
-  *
-  * @param dataSourceParams Data Source name-parameters tuple.
-  * @param preparatorParams Preparator name-parameters tuple.
-  * @param algorithmParamsList List of algorithm name-parameter pairs.
-  * @param servingParams Serving name-parameters tuple.
-  * @group Engine
-  */
-class EngineParams(
-    val dataSourceParams: (String, Params) = ("", EmptyParams()),
-    val preparatorParams: (String, Params) = ("", EmptyParams()),
-    val algorithmParamsList: Seq[(String, Params)] = Seq(),
-    val servingParams: (String, Params) = ("", EmptyParams()))
-  extends Serializable {
-
-  /** Java-friendly constructor
-    *
-    * @param dataSourceName Data Source name
-    * @param dataSourceParams Data Source parameters
-    * @param preparatorName Preparator name
-    * @param preparatorParams Preparator parameters
-    * @param algorithmParamsList Map of algorithm name-parameters
-    * @param servingName Serving name
-    * @param servingParams Serving parameters
-    */
-  def this(
-    dataSourceName: String,
-    dataSourceParams: Params,
-    preparatorName: String,
-    preparatorParams: Params,
-    algorithmParamsList: _root_.java.util.Map[String, _ <: Params],
-    servingName: String,
-    servingParams: Params) = {
-
-    // To work around a json4s weird limitation, the parameter names can not 
be changed
-    this(
-      (dataSourceName, dataSourceParams),
-      (preparatorName, preparatorParams),
-      JavaConversions.mapAsScalaMap(algorithmParamsList).toSeq,
-      (servingName, servingParams)
-    )
-  }
-
-  // A case class style copy method.
-  def copy(
-    dataSourceParams: (String, Params) = dataSourceParams,
-    preparatorParams: (String, Params) = preparatorParams,
-    algorithmParamsList: Seq[(String, Params)] = algorithmParamsList,
-    servingParams: (String, Params) = servingParams): EngineParams = {
-
-    new EngineParams(
-      dataSourceParams,
-      preparatorParams,
-      algorithmParamsList,
-      servingParams)
-  }
-}
-
-/** Companion object for creating [[EngineParams]] instances.
-  *
-  * @group Engine
-  */
-object EngineParams {
-  /** Create EngineParams.
-    *
-    * @param dataSourceName Data Source name
-    * @param dataSourceParams Data Source parameters
-    * @param preparatorName Preparator name
-    * @param preparatorParams Preparator parameters
-    * @param algorithmParamsList List of algorithm name-parameter pairs.
-    * @param servingName Serving name
-    * @param servingParams Serving parameters
-    */
-  def apply(
-    dataSourceName: String = "",
-    dataSourceParams: Params = EmptyParams(),
-    preparatorName: String = "",
-    preparatorParams: Params = EmptyParams(),
-    algorithmParamsList: Seq[(String, Params)] = Seq(),
-    servingName: String = "",
-    servingParams: Params = EmptyParams()): EngineParams = {
-      new EngineParams(
-        dataSourceParams = (dataSourceName, dataSourceParams),
-        preparatorParams = (preparatorName, preparatorParams),
-        algorithmParamsList = algorithmParamsList,
-        servingParams = (servingName, servingParams)
-      )
-    }
-}
-
-/** SimpleEngine has only one algorithm, and uses default preparator and 
serving
-  * layer. Current default preparator is `IdentityPreparator` and serving is
-  * `FirstServing`.
-  *
-  * @tparam TD Training data class.
-  * @tparam EI Evaluation info class.
-  * @tparam Q Input query class.
-  * @tparam P Output prediction class.
-  * @tparam A Actual value class.
-  * @param dataSourceClass Data source class.
-  * @param algorithmClass of algorithm names to classes.
-  * @group Engine
-  */
-class SimpleEngine[TD, EI, Q, P, A](
-    dataSourceClass: Class[_ <: BaseDataSource[TD, EI, Q, A]],
-    algorithmClass: Class[_ <: BaseAlgorithm[TD, _, Q, P]])
-  extends Engine(
-    dataSourceClass,
-    IdentityPreparator(dataSourceClass),
-    Map("" -> algorithmClass),
-    LFirstServing(algorithmClass))
-
-/** This shorthand class serves the `SimpleEngine` class.
-  *
-  * @param dataSourceParams Data source parameters.
-  * @param algorithmParams List of algorithm name-parameter pairs.
-  * @group Engine
-  */
-class SimpleEngineParams(
-    dataSourceParams: Params = EmptyParams(),
-    algorithmParams: Params = EmptyParams())
-  extends EngineParams(
-    dataSourceParams = ("", dataSourceParams),
-    algorithmParamsList = Seq(("", algorithmParams)))
-
-

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/EngineParamsGenerator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/io/prediction/controller/EngineParamsGenerator.scala 
b/core/src/main/scala/io/prediction/controller/EngineParamsGenerator.scala
deleted file mode 100644
index a9bf3eb..0000000
--- a/core/src/main/scala/io/prediction/controller/EngineParamsGenerator.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import scala.language.implicitConversions
-
-/** Defines an engine parameters generator.
-  *
-  * Implementations of this trait can be supplied to "pio eval" as the second
-  * command line argument.
-  *
-  * @group Evaluation
-  */
-trait EngineParamsGenerator {
-  protected[this] var epList: Seq[EngineParams] = _
-  protected[this] var epListSet: Boolean = false
-
-  /** Returns the list of [[EngineParams]] of this [[EngineParamsGenerator]]. 
*/
-  def engineParamsList: Seq[EngineParams] = {
-    assert(epListSet, "EngineParamsList not set")
-    epList
-  }
-
-  /** Sets the list of [[EngineParams]] of this [[EngineParamsGenerator]]. */
-  def engineParamsList_=(l: Seq[EngineParams]) {
-    assert(!epListSet, "EngineParamsList can bet set at most once")
-    epList = Seq(l:_*)
-    epListSet = true
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/Evaluation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/controller/Evaluation.scala 
b/core/src/main/scala/io/prediction/controller/Evaluation.scala
deleted file mode 100644
index a6ee9a7..0000000
--- a/core/src/main/scala/io/prediction/controller/Evaluation.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import io.prediction.core.BaseEngine
-import io.prediction.core.BaseEvaluator
-import io.prediction.core.BaseEvaluatorResult
-
-import scala.language.implicitConversions
-
-/** Defines an evaluation that contains an engine and a metric.
-  *
-  * Implementations of this trait can be supplied to "pio eval" as the first
-  * argument.
-  *
-  * @group Evaluation
-  */
-trait Evaluation extends Deployment {
-  protected [this] var _evaluatorSet: Boolean = false
-  protected [this] var _evaluator: BaseEvaluator[_, _, _, _, _ <: 
BaseEvaluatorResult] = _
-
-  private [prediction]
-  def evaluator: BaseEvaluator[_, _, _, _, _ <: BaseEvaluatorResult] = {
-    assert(_evaluatorSet, "Evaluator not set")
-    _evaluator
-  }
-
-  /** Gets the tuple of the [[Engine]] and the implementation of
-    * [[io.prediction.core.BaseEvaluator]]
-    */
-  def engineEvaluator
-  : (BaseEngine[_, _, _, _], BaseEvaluator[_, _, _, _, _]) = {
-    assert(_evaluatorSet, "Evaluator not set")
-    (engine, _evaluator)
-  }
-
-  /** Sets both an [[Engine]] and an implementation of
-    * [[io.prediction.core.BaseEvaluator]] for this [[Evaluation]]
-    *
-    * @param engineEvaluator A tuple an [[Engine]] and an implementation of
-    *                        [[io.prediction.core.BaseEvaluator]]
-    * @tparam EI Evaluation information class
-    * @tparam Q Query class
-    * @tparam P Predicted result class
-    * @tparam A Actual result class
-    * @tparam R Metric result class
-    */
-  def engineEvaluator_=[EI, Q, P, A, R <: BaseEvaluatorResult](
-    engineEvaluator: (
-      BaseEngine[EI, Q, P, A],
-      BaseEvaluator[EI, Q, P, A, R])) {
-    assert(!_evaluatorSet, "Evaluator can be set at most once")
-    engine = engineEvaluator._1
-    _evaluator = engineEvaluator._2
-    _evaluatorSet = true
-  }
-
-  /** Returns both the [[Engine]] and the implementation of [[Metric]] for this
-    * [[Evaluation]]
-    */
-  def engineMetric: (BaseEngine[_, _, _, _], Metric[_, _, _, _, _]) = {
-    throw new NotImplementedError("This method is to keep the compiler happy")
-  }
-
-  /** Sets both an [[Engine]] and an implementation of [[Metric]] for this
-    * [[Evaluation]]
-    *
-    * @param engineMetric A tuple of [[Engine]] and an implementation of
-    *                     [[Metric]]
-    * @tparam EI Evaluation information class
-    * @tparam Q Query class
-    * @tparam P Predicted result class
-    * @tparam A Actual result class
-    */
-  def engineMetric_=[EI, Q, P, A](
-    engineMetric: (BaseEngine[EI, Q, P, A], Metric[EI, Q, P, A, _])) {
-    engineEvaluator = (
-      engineMetric._1,
-      MetricEvaluator(
-        metric = engineMetric._2,
-        otherMetrics = Seq[Metric[EI, Q, P, A, _]](),
-        outputPath = "best.json"))
-  }
-
-  private [prediction]
-  def engineMetrics: (BaseEngine[_, _, _, _], Metric[_, _, _, _, _]) = {
-    throw new NotImplementedError("This method is to keep the compiler happy")
-  }
-
-  /** Sets an [[Engine]], an implementation of [[Metric]], and sequence of
-    * implementations of [[Metric]] for this [[Evaluation]]
-    *
-    * @param engineMetrics A tuple of [[Engine]], an implementation of
-    *                      [[Metric]] and sequence of implementations of 
[[Metric]]
-    * @tparam EI Evaluation information class
-    * @tparam Q Query class
-    * @tparam P Predicted result class
-    * @tparam A Actual result class
-    */
-  def engineMetrics_=[EI, Q, P, A](
-    engineMetrics: (
-      BaseEngine[EI, Q, P, A],
-      Metric[EI, Q, P, A, _],
-      Seq[Metric[EI, Q, P, A, _]])) {
-    engineEvaluator = (
-      engineMetrics._1,
-      MetricEvaluator(engineMetrics._2, engineMetrics._3))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/FastEvalEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/controller/FastEvalEngine.scala 
b/core/src/main/scala/io/prediction/controller/FastEvalEngine.scala
deleted file mode 100644
index 8e9727e..0000000
--- a/core/src/main/scala/io/prediction/controller/FastEvalEngine.scala
+++ /dev/null
@@ -1,343 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import io.prediction.core.BaseDataSource
-import io.prediction.core.BasePreparator
-import io.prediction.core.BaseAlgorithm
-import io.prediction.core.BaseServing
-import io.prediction.core.Doer
-import io.prediction.annotation.Experimental
-
-import grizzled.slf4j.Logger
-import io.prediction.workflow.WorkflowParams
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import scala.language.implicitConversions
-
-import _root_.java.util.NoSuchElementException
-
-import scala.collection.mutable.{ HashMap => MutableHashMap }
-
-/** :: Experimental ::
-  * Workflow based on [[FastEvalEngine]]
-  *
-  * @group Evaluation
-  */
-@Experimental
-object FastEvalEngineWorkflow  {
-  @transient lazy val logger = Logger[this.type]
-
-  type EX = Int
-  type AX = Int
-  type QX = Long
-
-  case class DataSourcePrefix(dataSourceParams: (String, Params)) {
-    def this(pp: PreparatorPrefix) = this(pp.dataSourceParams)
-    def this(ap: AlgorithmsPrefix) = this(ap.dataSourceParams)
-    def this(sp: ServingPrefix) = this(sp.dataSourceParams)
-  }
-
-  case class PreparatorPrefix(
-    dataSourceParams: (String, Params),
-    preparatorParams: (String, Params)) {
-    def this(ap: AlgorithmsPrefix) = {
-      this(ap.dataSourceParams, ap.preparatorParams)
-    }
-  }
-
-  case class AlgorithmsPrefix(
-    dataSourceParams: (String, Params),
-    preparatorParams: (String, Params),
-    algorithmParamsList: Seq[(String, Params)]) {
-    def this(sp: ServingPrefix) = {
-      this(sp.dataSourceParams, sp.preparatorParams, sp.algorithmParamsList)
-    }
-  }
-
-  case class ServingPrefix(
-    dataSourceParams: (String, Params),
-    preparatorParams: (String, Params),
-    algorithmParamsList: Seq[(String, Params)],
-    servingParams: (String, Params)) {
-    def this(ep: EngineParams) = this(
-      ep.dataSourceParams,
-      ep.preparatorParams,
-      ep.algorithmParamsList,
-      ep.servingParams)
-  }
-
-  def getDataSourceResult[TD, EI, PD, Q, P, A](
-    workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
-    prefix: DataSourcePrefix)
-  : Map[EX, (TD, EI, RDD[(QX, (Q, A))])] = {
-    val cache = workflow.dataSourceCache
-
-    if (!cache.contains(prefix)) {
-      val dataSource = Doer(
-        workflow.engine.dataSourceClassMap(prefix.dataSourceParams._1),
-        prefix.dataSourceParams._2)
-
-      val result = dataSource
-      .readEvalBase(workflow.sc)
-      .map { case (td, ei, qaRDD) => {
-        (td, ei, qaRDD.zipWithUniqueId().map(_.swap))
-      }}
-      .zipWithIndex
-      .map(_.swap)
-      .toMap
-
-      cache += Tuple2(prefix, result)
-    }
-    cache(prefix)
-  }
-
-  def getPreparatorResult[TD, EI, PD, Q, P, A](
-    workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
-    prefix: PreparatorPrefix): Map[EX, PD] = {
-    val cache = workflow.preparatorCache
-
-    if (!cache.contains(prefix)) {
-      val preparator = Doer(
-        workflow.engine.preparatorClassMap(prefix.preparatorParams._1),
-        prefix.preparatorParams._2)
-
-      val result = getDataSourceResult(
-        workflow = workflow,
-        prefix = new DataSourcePrefix(prefix))
-      .mapValues { case (td, _, _) => preparator.prepareBase(workflow.sc, td) }
-
-      cache += Tuple2(prefix, result)
-    }
-    cache(prefix)
-  }
-
-  def computeAlgorithmsResult[TD, EI, PD, Q, P, A](
-    workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
-    prefix: AlgorithmsPrefix): Map[EX, RDD[(QX, Seq[P])]] = {
-
-    val algoMap: Map[AX, BaseAlgorithm[PD, _, Q, P]] = 
prefix.algorithmParamsList
-      .map { case (algoName, algoParams) => {
-        try {
-          Doer(workflow.engine.algorithmClassMap(algoName), algoParams)
-        } catch {
-          case e: NoSuchElementException => {
-            val algorithmClassMap = workflow.engine.algorithmClassMap
-            if (algoName == "") {
-              logger.error("Empty algorithm name supplied but it could not " +
-                "match with any algorithm in the engine's definition. " +
-                "Existing algorithm name(s) are: " +
-                s"${algorithmClassMap.keys.mkString(", ")}. Aborting.")
-            } else {
-              logger.error(s"${algoName} cannot be found in the engine's " +
-                "definition. Existing algorithm name(s) are: " +
-                s"${algorithmClassMap.keys.mkString(", ")}. Aborting.")
-            }
-            sys.exit(1)
-          }
-        }
-      }}
-      .zipWithIndex
-      .map(_.swap)
-      .toMap
-
-    val algoCount = algoMap.size
-
-    // Model Train
-    val algoModelsMap: Map[EX, Map[AX, Any]] = getPreparatorResult(
-      workflow,
-      new PreparatorPrefix(prefix))
-    .mapValues {
-      pd => algoMap.mapValues(_.trainBase(workflow.sc,pd))
-    }
-
-    // Predict
-    val dataSourceResult =
-      FastEvalEngineWorkflow.getDataSourceResult(
-        workflow = workflow,
-        prefix = new DataSourcePrefix(prefix))
-
-    val algoResult: Map[EX, RDD[(QX, Seq[P])]] = dataSourceResult
-    .par
-    .map { case (ex, (td, ei, iqaRDD)) => {
-      val modelsMap: Map[AX, Any] = algoModelsMap(ex)
-      val qs: RDD[(QX, Q)] = iqaRDD.mapValues(_._1)
-
-      val algoPredicts: Seq[RDD[(QX, (AX, P))]] = (0 until algoCount)
-      .map { ax => {
-        val algo = algoMap(ax)
-        val model = modelsMap(ax)
-        val rawPredicts: RDD[(QX, P)] = algo.batchPredictBase(
-          workflow.sc,
-          model,
-          qs)
-
-        val predicts: RDD[(QX, (AX, P))] = rawPredicts.map {
-          case (qx, p) => (qx, (ax, p))
-        }
-        predicts
-      }}
-
-      val unionAlgoPredicts: RDD[(QX, Seq[P])] = workflow.sc
-      .union(algoPredicts)
-      .groupByKey
-      .mapValues { ps => {
-        assert (ps.size == algoCount, "Must have same length as algoCount")
-        // TODO. Check size == algoCount
-        ps.toSeq.sortBy(_._1).map(_._2)
-      }}
-      (ex, unionAlgoPredicts)
-    }}
-    .seq
-    .toMap
-
-    algoResult
-  }
-
-  def getAlgorithmsResult[TD, EI, PD, Q, P, A](
-    workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
-    prefix: AlgorithmsPrefix): Map[EX, RDD[(QX, Seq[P])]] = {
-    val cache = workflow.algorithmsCache
-    if (!cache.contains(prefix)) {
-      val result = computeAlgorithmsResult(workflow, prefix)
-      cache += Tuple2(prefix, result)
-    }
-    cache(prefix)
-  }
-
-  def getServingResult[TD, EI, PD, Q, P, A](
-    workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
-    prefix: ServingPrefix)
-  : Seq[(EI, RDD[(Q, P, A)])] = {
-    val cache = workflow.servingCache
-    if (!cache.contains(prefix)) {
-      val serving = Doer(
-        workflow.engine.servingClassMap(prefix.servingParams._1),
-        prefix.servingParams._2)
-
-      val algoPredictsMap = getAlgorithmsResult(
-        workflow = workflow,
-        prefix = new AlgorithmsPrefix(prefix))
-
-      val dataSourceResult = getDataSourceResult(
-        workflow = workflow,
-        prefix = new DataSourcePrefix(prefix))
-
-      val evalQAsMap = dataSourceResult.mapValues(_._3)
-      val evalInfoMap = dataSourceResult.mapValues(_._2)
-
-      val servingQPAMap: Map[EX, RDD[(Q, P, A)]] = algoPredictsMap
-      .map { case (ex, psMap) => {
-        val qasMap: RDD[(QX, (Q, A))] = evalQAsMap(ex)
-        val qpsaMap: RDD[(QX, Q, Seq[P], A)] = psMap.join(qasMap)
-        .map { case (qx, t) => (qx, t._2._1, t._1, t._2._2) }
-
-        val qpaMap: RDD[(Q, P, A)] = qpsaMap.map {
-          case (qx, q, ps, a) => (q, serving.serveBase(q, ps), a)
-        }
-        (ex, qpaMap)
-      }}
-
-      val servingResult = (0 until evalQAsMap.size).map { ex => {
-        (evalInfoMap(ex), servingQPAMap(ex))
-      }}
-      .toSeq
-
-      cache += Tuple2(prefix, servingResult)
-    }
-    cache(prefix)
-  }
-
-  def get[TD, EI, PD, Q, P, A](
-    workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A],
-    engineParamsList: Seq[EngineParams])
-  : Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])] = {
-    engineParamsList.map { engineParams => {
-      (engineParams,
-        getServingResult(workflow, new ServingPrefix(engineParams)))
-    }}
-  }
-}
-
-/** :: Experimental ::
-  * Workflow based on [[FastEvalEngine]]
-  *
-  * @group Evaluation
-  */
-@Experimental
-class FastEvalEngineWorkflow[TD, EI, PD, Q, P, A](
-  val engine: FastEvalEngine[TD, EI, PD, Q, P, A],
-  val sc: SparkContext,
-  val workflowParams: WorkflowParams) extends Serializable {
-
-  import io.prediction.controller.FastEvalEngineWorkflow._
-
-  type DataSourceResult = Map[EX, (TD, EI, RDD[(QX, (Q, A))])]
-  type PreparatorResult = Map[EX, PD]
-  type AlgorithmsResult = Map[EX, RDD[(QX, Seq[P])]]
-  type ServingResult = Seq[(EI, RDD[(Q, P, A)])]
-
-  val dataSourceCache = MutableHashMap[DataSourcePrefix, DataSourceResult]()
-  val preparatorCache = MutableHashMap[PreparatorPrefix, PreparatorResult]()
-  val algorithmsCache = MutableHashMap[AlgorithmsPrefix, AlgorithmsResult]()
-  val servingCache = MutableHashMap[ServingPrefix, ServingResult]()
-}
-
-
-
-/** :: Experimental ::
-  * FastEvalEngine is a subclass of [[Engine]] that exploits the immutability 
of
-  * controllers to optimize the evaluation process
-  *
-  * @group Evaluation
-  */
-@Experimental
-class FastEvalEngine[TD, EI, PD, Q, P, A](
-    dataSourceClassMap: Map[String, Class[_ <: BaseDataSource[TD, EI, Q, A]]],
-    preparatorClassMap: Map[String, Class[_ <: BasePreparator[TD, PD]]],
-    algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]],
-    servingClassMap: Map[String, Class[_ <: BaseServing[Q, P]]])
-  extends Engine[TD, EI, PD, Q, P, A](
-    dataSourceClassMap,
-    preparatorClassMap,
-    algorithmClassMap,
-    servingClassMap) {
-  @transient override lazy val logger = Logger[this.type]
-
-  override def eval(
-    sc: SparkContext,
-    engineParams: EngineParams,
-    params: WorkflowParams): Seq[(EI, RDD[(Q, P, A)])] = {
-    logger.info("FastEvalEngine.eval")
-    batchEval(sc, Seq(engineParams), params).head._2
-  }
-
-  override def batchEval(
-    sc: SparkContext,
-    engineParamsList: Seq[EngineParams],
-    params: WorkflowParams)
-  : Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])] = {
-
-    val fastEngineWorkflow = new FastEvalEngineWorkflow(
-      this, sc, params)
-
-    FastEvalEngineWorkflow.get(
-      fastEngineWorkflow,
-      engineParamsList)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/IdentityPreparator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/io/prediction/controller/IdentityPreparator.scala 
b/core/src/main/scala/io/prediction/controller/IdentityPreparator.scala
deleted file mode 100644
index 0bf3cb0..0000000
--- a/core/src/main/scala/io/prediction/controller/IdentityPreparator.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import io.prediction.core.BaseDataSource
-import io.prediction.core.BasePreparator
-import org.apache.spark.SparkContext
-
-import scala.reflect._
-
-/** A helper concrete implementation of [[io.prediction.core.BasePreparator]]
-  * that passes training data through without any special preparation. This can
-  * be used in place for both [[PPreparator]] and [[LPreparator]].
-  *
-  * @tparam TD Training data class.
-  * @group Preparator
-  */
-class IdentityPreparator[TD] extends BasePreparator[TD, TD] {
-  def prepareBase(sc: SparkContext, td: TD): TD = td
-}
-
-/** Companion object of [[IdentityPreparator]] that conveniently returns an
-  * instance of the class of [[IdentityPreparator]] for use with
-  * [[EngineFactory]].
-  *
-  * @group Preparator
-  */
-object IdentityPreparator {
-  /** Produces an instance of the class of [[IdentityPreparator]].
-    *
-    * @param ds Instance of the class of the data source for this preparator.
-    */
-  def apply[TD](ds: Class[_ <: BaseDataSource[TD, _, _, _]]): 
Class[IdentityPreparator[TD]] =
-    classOf[IdentityPreparator[TD]]
-}
-
-/** DEPRECATED. Use [[IdentityPreparator]] instead.
-  *
-  * @tparam TD Training data class.
-  * @group Preparator
-  */
-@deprecated("Use IdentityPreparator instead.", "0.9.2")
-class PIdentityPreparator[TD] extends IdentityPreparator[TD]
-
-/** DEPRECATED. Use [[IdentityPreparator]] instead.
-  *
-  * @group Preparator
-  */
-@deprecated("Use IdentityPreparator instead.", "0.9.2")
-object PIdentityPreparator {
-  /** Produces an instance of the class of [[IdentityPreparator]].
-    *
-    * @param ds Instance of the class of the data source for this preparator.
-    */
-  def apply[TD](ds: Class[_ <: BaseDataSource[TD, _, _, _]]): 
Class[IdentityPreparator[TD]] =
-    classOf[IdentityPreparator[TD]]
-}
-
-/** DEPRECATED. Use [[IdentityPreparator]] instead.
-  *
-  * @tparam TD Training data class.
-  * @group Preparator
-  */
-@deprecated("Use IdentityPreparator instead.", "0.9.2")
-class LIdentityPreparator[TD] extends IdentityPreparator[TD]
-
-/** DEPRECATED. Use [[IdentityPreparator]] instead.
-  *
-  * @group Preparator
-  */
-@deprecated("Use IdentityPreparator instead.", "0.9.2")
-object LIdentityPreparator {
-  /** Produces an instance of the class of [[IdentityPreparator]].
-    *
-    * @param ds Instance of the class of the data source for this preparator.
-    */
-  def apply[TD](ds: Class[_ <: BaseDataSource[TD, _, _, _]]): 
Class[IdentityPreparator[TD]] =
-    classOf[IdentityPreparator[TD]]
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/LAlgorithm.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/controller/LAlgorithm.scala 
b/core/src/main/scala/io/prediction/controller/LAlgorithm.scala
deleted file mode 100644
index 467a4a0..0000000
--- a/core/src/main/scala/io/prediction/controller/LAlgorithm.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import _root_.io.prediction.annotation.DeveloperApi
-import io.prediction.core.BaseAlgorithm
-import io.prediction.workflow.PersistentModelManifest
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-
-import scala.reflect._
-
-/** Base class of a local algorithm.
-  *
-  * A local algorithm runs locally within a single machine and produces a model
-  * that can fit within a single machine.
-  *
-  * If your input query class requires custom JSON4S serialization, the most
-  * idiomatic way is to implement a trait that extends 
[[CustomQuerySerializer]],
-  * and mix that into your algorithm class, instead of overriding
-  * [[querySerializer]] directly.
-  *
-  * @tparam PD Prepared data class.
-  * @tparam M Trained model class.
-  * @tparam Q Input query class.
-  * @tparam P Output prediction class.
-  * @group Algorithm
-  */
-abstract class LAlgorithm[PD, M : ClassTag, Q, P]
-  extends BaseAlgorithm[RDD[PD], RDD[M], Q, P] {
-
-  def trainBase(sc: SparkContext, pd: RDD[PD]): RDD[M] = pd.map(train)
-
-  /** Implement this method to produce a model from prepared data.
-    *
-    * @param pd Prepared data for model training.
-    * @return Trained model.
-    */
-  def train(pd: PD): M
-
-  def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)])
-  : RDD[(Long, P)] = {
-    val mRDD = bm.asInstanceOf[RDD[M]]
-    batchPredict(mRDD, qs)
-  }
-
-  /** This is a default implementation to perform batch prediction. Override
-    * this method for a custom implementation.
-    *
-    * @param mRDD A single model wrapped inside an RDD
-    * @param qs An RDD of index-query tuples. The index is used to keep track 
of
-    *           predicted results with corresponding queries.
-    * @return Batch of predicted results
-    */
-  def batchPredict(mRDD: RDD[M], qs: RDD[(Long, Q)]): RDD[(Long, P)] = {
-    val glomQs: RDD[Array[(Long, Q)]] = qs.glom()
-    val cartesian: RDD[(M, Array[(Long, Q)])] = mRDD.cartesian(glomQs)
-    cartesian.flatMap { case (m, qArray) =>
-      qArray.map { case (qx, q) => (qx, predict(m, q)) }
-    }
-  }
-
-  def predictBase(localBaseModel: Any, q: Q): P = {
-    predict(localBaseModel.asInstanceOf[M], q)
-  }
-
-  /** Implement this method to produce a prediction from a query and trained
-    * model.
-    *
-    * @param m Trained model produced by [[train]].
-    * @param q An input query.
-    * @return A prediction.
-    */
-  def predict(m: M, q: Q): P
-
-  /** :: DeveloperApi ::
-    * Engine developers should not use this directly (read on to see how local
-    * algorithm models are persisted).
-    *
-    * Local algorithms produce local models. By default, models will be
-    * serialized and stored automatically. Engine developers can override this 
behavior by
-    * mixing the [[PersistentModel]] trait into the model class, and
-    * PredictionIO will call [[PersistentModel.save]] instead. If it returns
-    * true, a [[io.prediction.workflow.PersistentModelManifest]] will be
-    * returned so that during deployment, PredictionIO will use
-    * [[PersistentModelLoader]] to retrieve the model. Otherwise, Unit will be
-    * returned and the model will be re-trained on-the-fly.
-    *
-    * @param sc Spark context
-    * @param modelId Model ID
-    * @param algoParams Algorithm parameters that trained this model
-    * @param bm Model
-    * @return The model itself for automatic persistence, an instance of
-    *         [[io.prediction.workflow.PersistentModelManifest]] for manual
-    *         persistence, or Unit for re-training on deployment
-    */
-  @DeveloperApi
-  override
-  def makePersistentModel(
-    sc: SparkContext,
-    modelId: String,
-    algoParams: Params,
-    bm: Any): Any = {
-    // Check RDD[M].count == 1
-    val m = bm.asInstanceOf[RDD[M]].first()
-    if (m.isInstanceOf[PersistentModel[_]]) {
-      if (m.asInstanceOf[PersistentModel[Params]].save(
-        modelId, algoParams, sc)) {
-        PersistentModelManifest(className = m.getClass.getName)
-      } else {
-        Unit
-      }
-    } else {
-      m
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/LAverageServing.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/controller/LAverageServing.scala 
b/core/src/main/scala/io/prediction/controller/LAverageServing.scala
deleted file mode 100644
index 80981ab..0000000
--- a/core/src/main/scala/io/prediction/controller/LAverageServing.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import io.prediction.core.BaseAlgorithm
-
-/** A concrete implementation of [[LServing]] returning the average of all
-  * algorithms' predictions, where their classes are expected to be all Double.
-  *
-  * @group Serving
-  */
-class LAverageServing[Q] extends LServing[Q, Double] {
-  /** Returns the average of all algorithms' predictions. */
-  def serve(query: Q, predictions: Seq[Double]): Double = {
-    predictions.sum / predictions.length
-  }
-}
-
-/** A concrete implementation of [[LServing]] returning the average of all
-  * algorithms' predictions, where their classes are expected to be all Double.
-  *
-  * @group Serving
-  */
-object LAverageServing {
-  /** Returns an instance of [[LAverageServing]]. */
-  def apply[Q](a: Class[_ <: BaseAlgorithm[_, _, Q, _]]): 
Class[LAverageServing[Q]] =
-    classOf[LAverageServing[Q]]
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/LDataSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/controller/LDataSource.scala 
b/core/src/main/scala/io/prediction/controller/LDataSource.scala
deleted file mode 100644
index aa53c8f..0000000
--- a/core/src/main/scala/io/prediction/controller/LDataSource.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import io.prediction.core.BaseDataSource
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-
-import scala.reflect._
-
-/** Base class of a local data source.
-  *
-  * A local data source runs locally within a single machine and return data
-  * that can fit within a single machine.
-  *
-  * @tparam TD Training data class.
-  * @tparam EI Evaluation Info class.
-  * @tparam Q Input query class.
-  * @tparam A Actual value class.
-  * @group Data Source
-  */
-abstract class LDataSource[TD: ClassTag, EI, Q, A]
-  extends BaseDataSource[RDD[TD], EI, Q, A] {
-
-  def readTrainingBase(sc: SparkContext): RDD[TD] = {
-    sc.parallelize(Seq(None)).map(_ => readTraining())
-  }
-
-  /** Implement this method to only return training data from a data source */
-  def readTraining(): TD
-
-  def readEvalBase(sc: SparkContext): Seq[(RDD[TD], EI, RDD[(Q, A)])] = {
-    val localEvalData: Seq[(TD, EI, Seq[(Q, A)])] = readEval()
-
-    localEvalData.map { case (td, ei, qaSeq) => {
-      val tdRDD = sc.parallelize(Seq(None)).map(_ => td)
-      val qaRDD = sc.parallelize(qaSeq)
-      (tdRDD, ei, qaRDD)
-    }}
-  }
-
-  /** To provide evaluation feature for your engine, your must override this
-    * method to return data for evaluation from a data source. Returned data 
can
-    * optionally include a sequence of query and actual value pairs for
-    * evaluation purpose.
-    *
-    * The default implementation returns an empty sequence as a stub, so that
-    * an engine can be compiled without implementing evaluation.
-    */
-  def readEval(): Seq[(TD, EI, Seq[(Q, A)])] = Seq[(TD, EI, Seq[(Q, A)])]()
-
-  @deprecated("Use readEval() instead.", "0.9.0")
-  def read(): Seq[(TD, EI, Seq[(Q, A)])] = readEval()
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/LFirstServing.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/controller/LFirstServing.scala 
b/core/src/main/scala/io/prediction/controller/LFirstServing.scala
deleted file mode 100644
index 970815e..0000000
--- a/core/src/main/scala/io/prediction/controller/LFirstServing.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import io.prediction.core.BaseAlgorithm
-
-/** A concrete implementation of [[LServing]] returning the first algorithm's
-  * prediction result directly without any modification.
-  *
-  * @group Serving
-  */
-class LFirstServing[Q, P] extends LServing[Q, P] {
-  /** Returns the first algorithm's prediction. */
-  def serve(query: Q, predictions: Seq[P]): P = predictions.head
-}
-
-/** A concrete implementation of [[LServing]] returning the first algorithm's
-  * prediction result directly without any modification.
-  *
-  * @group Serving
-  */
-object LFirstServing {
-  /** Returns an instance of [[LFirstServing]]. */
-  def apply[Q, P](a: Class[_ <: BaseAlgorithm[_, _, Q, P]]): 
Class[LFirstServing[Q, P]] =
-    classOf[LFirstServing[Q, P]]
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/LPreparator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/controller/LPreparator.scala 
b/core/src/main/scala/io/prediction/controller/LPreparator.scala
deleted file mode 100644
index f66dfc0..0000000
--- a/core/src/main/scala/io/prediction/controller/LPreparator.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import io.prediction.core.BasePreparator
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-
-import scala.reflect._
-
-/** Base class of a local preparator.
-  *
-  * A local preparator runs locally within a single machine and produces
-  * prepared data that can fit within a single machine.
-  *
-  * @tparam TD Training data class.
-  * @tparam PD Prepared data class.
-  * @group Preparator
-  */
-abstract class LPreparator[TD, PD : ClassTag]
-  extends BasePreparator[RDD[TD], RDD[PD]] {
-
-  def prepareBase(sc: SparkContext, rddTd: RDD[TD]): RDD[PD] = {
-    rddTd.map(prepare)
-  }
-
-  /** Implement this method to produce prepared data that is ready for model
-    * training.
-    *
-    * @param trainingData Training data to be prepared.
-    */
-  def prepare(trainingData: TD): PD
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/LServing.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/controller/LServing.scala 
b/core/src/main/scala/io/prediction/controller/LServing.scala
deleted file mode 100644
index accee48..0000000
--- a/core/src/main/scala/io/prediction/controller/LServing.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import io.prediction.annotation.Experimental
-import io.prediction.core.BaseServing
-
-/** Base class of serving.
-  *
-  * @tparam Q Input query class.
-  * @tparam P Output prediction class.
-  * @group Serving
-  */
-abstract class LServing[Q, P] extends BaseServing[Q, P] {
-  def supplementBase(q: Q): Q = supplement(q)
-
-  /** :: Experimental ::
-    * Implement this method to supplement the query before sending it to
-    * algorithms.
-    *
-    * @param q Query
-    * @return A supplemented Query
-    */
-  @Experimental
-  def supplement(q: Q): Q = q
-
-  def serveBase(q: Q, ps: Seq[P]): P = {
-    serve(q, ps)
-  }
-
-  /** Implement this method to combine multiple algorithms' predictions to
-    * produce a single final prediction. The query is the original query sent 
to
-    * the engine, not the supplemented produced by [[LServing.supplement]].
-    *
-    * @param query Original input query.
-    * @param predictions A list of algorithms' predictions.
-    */
-  def serve(query: Q, predictions: Seq[P]): P
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/LocalFileSystemPersistentModel.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/io/prediction/controller/LocalFileSystemPersistentModel.scala
 
b/core/src/main/scala/io/prediction/controller/LocalFileSystemPersistentModel.scala
deleted file mode 100644
index e9f0592..0000000
--- 
a/core/src/main/scala/io/prediction/controller/LocalFileSystemPersistentModel.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import org.apache.spark.SparkContext
-
-/** This trait is a convenience helper for persisting your model to the local
-  * filesystem. This trait and [[LocalFileSystemPersistentModelLoader]] contain
-  * concrete implementation and need not be implemented.
-  *
-  * The underlying implementation is [[Utils.save]].
-  *
-  * {{{
-  * class MyModel extends LocalFileSystemPersistentModel[MyParams] {
-  *   ...
-  * }
-  *
-  * object MyModel extends LocalFileSystemPersistentModelLoader[MyParams, 
MyModel] {
-  *   ...
-  * }
-  * }}}
-  *
-  * @tparam AP Algorithm parameters class.
-  * @see [[LocalFileSystemPersistentModelLoader]]
-  * @group Algorithm
-  */
-trait LocalFileSystemPersistentModel[AP <: Params] extends PersistentModel[AP] 
{
-  def save(id: String, params: AP, sc: SparkContext): Boolean = {
-    Utils.save(id, this)
-    true
-  }
-}
-
-/** Implement an object that extends this trait for PredictionIO to support
-  * loading a persisted model from local filesystem during serving deployment.
-  *
-  * The underlying implementation is [[Utils.load]].
-  *
-  * @tparam AP Algorithm parameters class.
-  * @tparam M Model class.
-  * @see [[LocalFileSystemPersistentModel]]
-  * @group Algorithm
-  */
-trait LocalFileSystemPersistentModelLoader[AP <: Params, M]
-  extends PersistentModelLoader[AP, M] {
-  def apply(id: String, params: AP, sc: Option[SparkContext]): M = {
-    Utils.load(id).asInstanceOf[M]
-  }
-}
-
-/** DEPRECATED. Use [[LocalFileSystemPersistentModel]] instead.
-  *
-  * @group Algorithm */
-@deprecated("Use LocalFileSystemPersistentModel instead.", "0.9.2")
-trait IFSPersistentModel[AP <: Params] extends 
LocalFileSystemPersistentModel[AP]
-
-/** DEPRECATED. Use [[LocalFileSystemPersistentModelLoader]] instead.
-  *
-  * @group Algorithm */
-@deprecated("Use LocalFileSystemPersistentModelLoader instead.", "0.9.2")
-trait IFSPersistentModelLoader[AP <: Params, M] extends 
LocalFileSystemPersistentModelLoader[AP, M]

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/Metric.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/controller/Metric.scala 
b/core/src/main/scala/io/prediction/controller/Metric.scala
deleted file mode 100644
index 9e56125..0000000
--- a/core/src/main/scala/io/prediction/controller/Metric.scala
+++ /dev/null
@@ -1,266 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import _root_.io.prediction.controller.java.SerializableComparator
-import io.prediction.core.BaseEngine
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import org.apache.spark.util.StatCounter
-
-import scala.Numeric.Implicits._
-import scala.reflect._
-
-/** Base class of a [[Metric]].
-  *
-  * @tparam EI Evaluation information
-  * @tparam Q Query
-  * @tparam P Predicted result
-  * @tparam A Actual result
-  * @tparam R Metric result
-  * @group Evaluation
-  */
-abstract class Metric[EI, Q, P, A, R](implicit rOrder: Ordering[R])
-extends Serializable {
-  /** Java friendly constructor
-    *
-    * @param comparator A serializable comparator for sorting the metric 
results.
-    *
-    */
-  def this(comparator: SerializableComparator[R]) = {
-    this()(Ordering.comparatorToOrdering(comparator))
-  }
-
-  /** Class name of this [[Metric]]. */
-  def header: String = this.getClass.getSimpleName
-
-  /** Calculates the result of this [[Metric]]. */
-  def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]): R
-
-  /** Comparison function for R's ordering. */
-  def compare(r0: R, r1: R): Int = rOrder.compare(r0, r1)
-}
-
-private [prediction] trait StatsMetricHelper[EI, Q, P, A] {
-  def calculate(q: Q, p: P, a: A): Double
-
-  def calculateStats(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])])
-  : StatCounter = {
-    val doubleRDD = sc.union(
-      evalDataSet.map { case (_, qpaRDD) =>
-        qpaRDD.map { case (q, p, a) => calculate(q, p, a) }
-      }
-    )
-
-    doubleRDD.stats()
-  }
-}
-
-private [prediction] trait StatsOptionMetricHelper[EI, Q, P, A] {
-  def calculate(q: Q, p: P, a: A): Option[Double]
-
-  def calculateStats(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])])
-  : StatCounter = {
-    val doubleRDD = sc.union(
-      evalDataSet.map { case (_, qpaRDD) =>
-        qpaRDD.flatMap { case (q, p, a) => calculate(q, p, a) }
-      }
-    )
-
-    doubleRDD.stats()
-  }
-}
-
-/** Returns the global average of the score returned by the calculate method.
-  *
-  * @tparam EI Evaluation information
-  * @tparam Q Query
-  * @tparam P Predicted result
-  * @tparam A Actual result
-  *
-  * @group Evaluation
-  */
-abstract class AverageMetric[EI, Q, P, A]
-    extends Metric[EI, Q, P, A, Double]
-    with StatsMetricHelper[EI, Q, P, A]
-    with QPAMetric[Q, P, A, Double] {
-  /** Implement this method to return a score that will be used for averaging
-    * across all QPA tuples.
-    */
-  def calculate(q: Q, p: P, a: A): Double
-
-  def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])])
-  : Double = {
-    calculateStats(sc, evalDataSet).mean
-  }
-}
-
-/** Returns the global average of the non-None score returned by the calculate
-  * method.
-  *
-  * @tparam EI Evaluation information
-  * @tparam Q Query
-  * @tparam P Predicted result
-  * @tparam A Actual result
-  *
-  * @group Evaluation
-  */
-abstract class OptionAverageMetric[EI, Q, P, A]
-    extends Metric[EI, Q, P, A, Double]
-    with StatsOptionMetricHelper[EI, Q, P, A]
-    with QPAMetric[Q, P, A, Option[Double]] {
-  /** Implement this method to return a score that will be used for averaging
-    * across all QPA tuples.
-    */
-  def calculate(q: Q, p: P, a: A): Option[Double]
-
-  def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])])
-  : Double = {
-    calculateStats(sc, evalDataSet).mean
-  }
-}
-
-/** Returns the global standard deviation of the score returned by the 
calculate method
-  *
-  * This method uses org.apache.spark.util.StatCounter library, a one pass
-  * method is used for calculation
-  *
-  * @tparam EI Evaluation information
-  * @tparam Q Query
-  * @tparam P Predicted result
-  * @tparam A Actual result
-  *
-  * @group Evaluation
-  */
-abstract class StdevMetric[EI, Q, P, A]
-    extends Metric[EI, Q, P, A, Double]
-    with StatsMetricHelper[EI, Q, P, A]
-    with QPAMetric[Q, P, A, Double] {
-  /** Implement this method to return a score that will be used for calculating
-    * the stdev
-    * across all QPA tuples.
-    */
-  def calculate(q: Q, p: P, a: A): Double
-
-  def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])])
-  : Double = {
-    calculateStats(sc, evalDataSet).stdev
-  }
-}
-
-/** Returns the global standard deviation of the non-None score returned by 
the calculate method
-  *
-  * This method uses org.apache.spark.util.StatCounter library, a one pass
-  * method is used for calculation
-  *
-  * @tparam EI Evaluation information
-  * @tparam Q Query
-  * @tparam P Predicted result
-  * @tparam A Actual result
-  *
-  * @group Evaluation
-  */
-abstract class OptionStdevMetric[EI, Q, P, A]
-    extends Metric[EI, Q, P, A, Double]
-    with StatsOptionMetricHelper[EI, Q, P, A]
-    with QPAMetric[Q, P, A, Option[Double]] {
-  /** Implement this method to return a score that will be used for calculating
-    * the stdev
-    * across all QPA tuples.
-    */
-  def calculate(q: Q, p: P, a: A): Option[Double]
-
-  def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])])
-  : Double = {
-    calculateStats(sc, evalDataSet).stdev
-  }
-}
-
-/** Returns the sum of the score returned by the calculate method.
-  *
-  * @tparam EI Evaluation information
-  * @tparam Q Query
-  * @tparam P Predicted result
-  * @tparam A Actual result
-  * @tparam R Result, output of the function calculate, must be Numeric
-  *
-  * @group Evaluation
-  */
-abstract class SumMetric[EI, Q, P, A, R: ClassTag](implicit num: Numeric[R])
-    extends Metric[EI, Q, P, A, R]()(num)
-    with QPAMetric[Q, P, A, R] {
-  /** Implement this method to return a score that will be used for summing
-    * across all QPA tuples.
-    */
-  def calculate(q: Q, p: P, a: A): R
-
-  def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])])
-  : R = {
-    val union: RDD[R] = sc.union(
-      evalDataSet.map { case (_, qpaRDD) =>
-        qpaRDD.map { case (q, p, a) => calculate(q, p, a) }
-      }
-    )
-
-    union.aggregate[R](num.zero)(_ + _, _ + _)
-  }
-}
-
-/** Returns zero. Useful as a placeholder during evaluation development when 
not all components are
-  * implemented.
-  * @tparam EI Evaluation information
-  * @tparam Q Query
-  * @tparam P Predicted result
-  * @tparam A Actual result
-  *
-  * @group Evaluation
-  */
-class ZeroMetric[EI, Q, P, A] extends Metric[EI, Q, P, A, Double]() {
-   def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]): 
Double = 0.0
-}
-
-/** Companion object of [[ZeroMetric]]
-  *
-  * @group Evaluation
-  */
-object ZeroMetric {
-  /** Returns a ZeroMetric instance using Engine's type parameters. */
-  def apply[EI, Q, P, A](engine: BaseEngine[EI, Q, P, A]): ZeroMetric[EI, Q, 
P, A] = {
-    new ZeroMetric[EI, Q, P, A]()
-  }
-}
-
-
-/** Trait for metric which returns a score based on Query, PredictedResult,
-  * and ActualResult
-  *
-  * @tparam Q Query class
-  * @tparam P Predicted result class
-  * @tparam A Actual result class
-  * @tparam R Metric result class
-  * @group Evaluation
-  */
-trait QPAMetric[Q, P, A, R] {
-  /** Calculate a metric result based on query, predicted result, and actual
-    * result
-    *
-    * @param q Query
-    * @param p Predicted result
-    * @param a Actual result
-    * @return Metric result
-    */
-  def calculate(q: Q, p: P, a: A): R
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/MetricEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/controller/MetricEvaluator.scala 
b/core/src/main/scala/io/prediction/controller/MetricEvaluator.scala
deleted file mode 100644
index 41ccc9c..0000000
--- a/core/src/main/scala/io/prediction/controller/MetricEvaluator.scala
+++ /dev/null
@@ -1,260 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import _root_.java.io.File
-import _root_.java.io.PrintWriter
-
-import com.github.nscala_time.time.Imports.DateTime
-import grizzled.slf4j.Logger
-import io.prediction.annotation.DeveloperApi
-import io.prediction.core.BaseEvaluator
-import io.prediction.core.BaseEvaluatorResult
-import io.prediction.data.storage.Storage
-import io.prediction.workflow.JsonExtractor
-import io.prediction.workflow.JsonExtractorOption.Both
-import io.prediction.workflow.NameParamsSerializer
-import io.prediction.workflow.WorkflowParams
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import org.json4s.native.Serialization.write
-import org.json4s.native.Serialization.writePretty
-
-import scala.language.existentials
-
-/** Case class storing a primary score, and other scores
-  *
-  * @param score Primary metric score
-  * @param otherScores Other scores this metric might have
-  * @tparam R Type of the primary metric score
-  * @group Evaluation
-  */
-case class MetricScores[R](
-  score: R,
-  otherScores: Seq[Any])
-
-/** Contains all results of a [[MetricEvaluator]]
-  *
-  * @param bestScore The best score among all iterations
-  * @param bestEngineParams The set of engine parameters that yielded the best 
score
-  * @param bestIdx The index of iteration that yielded the best score
-  * @param metricHeader Brief description of the primary metric score
-  * @param otherMetricHeaders Brief descriptions of other metric scores
-  * @param engineParamsScores All sets of engine parameters and corresponding 
metric scores
-  * @param outputPath An optional output path where scores are saved
-  * @tparam R Type of the primary metric score
-  * @group Evaluation
-  */
-case class MetricEvaluatorResult[R](
-  bestScore: MetricScores[R],
-  bestEngineParams: EngineParams,
-  bestIdx: Int,
-  metricHeader: String,
-  otherMetricHeaders: Seq[String],
-  engineParamsScores: Seq[(EngineParams, MetricScores[R])],
-  outputPath: Option[String])
-extends BaseEvaluatorResult {
-
-  override def toOneLiner(): String = {
-    val idx = engineParamsScores.map(_._1).indexOf(bestEngineParams)
-    s"Best Params Index: $idx Score: ${bestScore.score}"
-  }
-
-  override def toJSON(): String = {
-    implicit lazy val formats = Utils.json4sDefaultFormats +
-      new NameParamsSerializer
-    write(this)
-  }
-
-  override def toHTML(): String = html.metric_evaluator().toString()
-
-  override def toString: String = {
-    implicit lazy val formats = Utils.json4sDefaultFormats +
-      new NameParamsSerializer
-
-    val bestEPStr = JsonExtractor.engineParamstoPrettyJson(Both, 
bestEngineParams)
-
-    val strings = Seq(
-      "MetricEvaluatorResult:",
-      s"  # engine params evaluated: ${engineParamsScores.size}") ++
-      Seq(
-        "Optimal Engine Params:",
-        s"  $bestEPStr",
-        "Metrics:",
-        s"  $metricHeader: ${bestScore.score}") ++
-      otherMetricHeaders.zip(bestScore.otherScores).map {
-        case (h, s) => s"  $h: $s"
-      } ++
-      outputPath.toSeq.map {
-        p => s"The best variant params can be found in $p"
-      }
-
-    strings.mkString("\n")
-  }
-}
-
-/** Companion object of [[MetricEvaluator]]
-  *
-  * @group Evaluation
-  */
-object MetricEvaluator {
-  def apply[EI, Q, P, A, R](
-    metric: Metric[EI, Q, P, A, R],
-    otherMetrics: Seq[Metric[EI, Q, P, A, _]],
-    outputPath: String): MetricEvaluator[EI, Q, P, A, R] = {
-    new MetricEvaluator[EI, Q, P, A, R](
-      metric,
-      otherMetrics,
-      Some(outputPath))
-  }
-
-  def apply[EI, Q, P, A, R](
-    metric: Metric[EI, Q, P, A, R],
-    otherMetrics: Seq[Metric[EI, Q, P, A, _]])
-  : MetricEvaluator[EI, Q, P, A, R] = {
-    new MetricEvaluator[EI, Q, P, A, R](
-      metric,
-      otherMetrics,
-      None)
-  }
-
-  def apply[EI, Q, P, A, R](metric: Metric[EI, Q, P, A, R])
-  : MetricEvaluator[EI, Q, P, A, R] = {
-    new MetricEvaluator[EI, Q, P, A, R](
-      metric,
-      Seq[Metric[EI, Q, P, A, _]](),
-      None)
-  }
-
-  case class NameParams(name: String, params: Params) {
-    def this(np: (String, Params)) = this(np._1, np._2)
-  }
-
-  case class EngineVariant(
-    id: String,
-    description: String,
-    engineFactory: String,
-    datasource: NameParams,
-    preparator: NameParams,
-    algorithms: Seq[NameParams],
-    serving: NameParams) {
-
-    def this(evaluation: Evaluation, engineParams: EngineParams) = this(
-      id = "",
-      description = "",
-      engineFactory = evaluation.getClass.getName,
-      datasource = new NameParams(engineParams.dataSourceParams),
-      preparator = new NameParams(engineParams.preparatorParams),
-      algorithms = engineParams.algorithmParamsList.map(np => new 
NameParams(np)),
-      serving = new NameParams(engineParams.servingParams))
-  }
-}
-
-/** :: DeveloperApi ::
-  * Do no use this directly. Use [[MetricEvaluator$]] instead. This is an
-  * implementation of [[io.prediction.core.BaseEvaluator]] that evaluates
-  * prediction performance based on metric scores.
-  *
-  * @param metric Primary metric
-  * @param otherMetrics Other metrics
-  * @param outputPath Optional output path to save evaluation results
-  * @tparam EI Evaluation information type
-  * @tparam Q Query class
-  * @tparam P Predicted result class
-  * @tparam A Actual result class
-  * @tparam R Metric result class
-  * @group Evaluation
-  */
-@DeveloperApi
-class MetricEvaluator[EI, Q, P, A, R] (
-  val metric: Metric[EI, Q, P, A, R],
-  val otherMetrics: Seq[Metric[EI, Q, P, A, _]],
-  val outputPath: Option[String])
-  extends BaseEvaluator[EI, Q, P, A, MetricEvaluatorResult[R]] {
-  @transient lazy val logger = Logger[this.type]
-  @transient val engineInstances = Storage.getMetaDataEngineInstances()
-
-  def saveEngineJson(
-    evaluation: Evaluation,
-    engineParams: EngineParams,
-    outputPath: String) {
-
-    val now = DateTime.now
-    val evalClassName = evaluation.getClass.getName
-
-    val variant = MetricEvaluator.EngineVariant(
-      id = s"$evalClassName $now",
-      description = "",
-      engineFactory = evalClassName,
-      datasource = new 
MetricEvaluator.NameParams(engineParams.dataSourceParams),
-      preparator = new 
MetricEvaluator.NameParams(engineParams.preparatorParams),
-      algorithms = engineParams.algorithmParamsList.map(np => new 
MetricEvaluator.NameParams(np)),
-      serving = new MetricEvaluator.NameParams(engineParams.servingParams))
-
-    implicit lazy val formats = Utils.json4sDefaultFormats
-
-    logger.info(s"Writing best variant params to disk ($outputPath)...")
-    val writer = new PrintWriter(new File(outputPath))
-    writer.write(writePretty(variant))
-    writer.close()
-  }
-
-  def evaluateBase(
-    sc: SparkContext,
-    evaluation: Evaluation,
-    engineEvalDataSet: Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])],
-    params: WorkflowParams): MetricEvaluatorResult[R] = {
-
-    val evalResultList: Seq[(EngineParams, MetricScores[R])] = 
engineEvalDataSet
-    .zipWithIndex
-    .par
-    .map { case ((engineParams, evalDataSet), idx) =>
-      val metricScores = MetricScores[R](
-        metric.calculate(sc, evalDataSet),
-        otherMetrics.map(_.calculate(sc, evalDataSet)))
-      (engineParams, metricScores)
-    }
-    .seq
-
-    implicit lazy val formats = Utils.json4sDefaultFormats +
-      new NameParamsSerializer
-
-    evalResultList.zipWithIndex.foreach { case ((ep, r), idx) =>
-      logger.info(s"Iteration $idx")
-      logger.info(s"EngineParams: ${JsonExtractor.engineParamsToJson(Both, 
ep)}")
-      logger.info(s"Result: $r")
-    }
-
-    // use max. take implicit from Metric.
-    val ((bestEngineParams, bestScore), bestIdx) = evalResultList
-    .zipWithIndex
-    .reduce { (x, y) =>
-      if (metric.compare(x._1._2.score, y._1._2.score) >= 0) x else y
-    }
-
-    // save engine params if it is set.
-    outputPath.foreach { path => saveEngineJson(evaluation, bestEngineParams, 
path) }
-
-    MetricEvaluatorResult(
-      bestScore = bestScore,
-      bestEngineParams = bestEngineParams,
-      bestIdx = bestIdx,
-      metricHeader = metric.header,
-      otherMetricHeaders = otherMetrics.map(_.header),
-      engineParamsScores = evalResultList,
-      outputPath = outputPath)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/P2LAlgorithm.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/controller/P2LAlgorithm.scala 
b/core/src/main/scala/io/prediction/controller/P2LAlgorithm.scala
deleted file mode 100644
index c59b9af..0000000
--- a/core/src/main/scala/io/prediction/controller/P2LAlgorithm.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import _root_.io.prediction.annotation.DeveloperApi
-import io.prediction.core.BaseAlgorithm
-import io.prediction.workflow.PersistentModelManifest
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import scala.reflect._
-
-/** Base class of a parallel-to-local algorithm.
-  *
-  * A parallel-to-local algorithm can be run in parallel on a cluster and
-  * produces a model that can fit within a single machine.
-  *
-  * If your input query class requires custom JSON4S serialization, the most
-  * idiomatic way is to implement a trait that extends 
[[CustomQuerySerializer]],
-  * and mix that into your algorithm class, instead of overriding
-  * [[querySerializer]] directly.
-  *
-  * @tparam PD Prepared data class.
-  * @tparam M Trained model class.
-  * @tparam Q Input query class.
-  * @tparam P Output prediction class.
-  * @group Algorithm
-  */
-abstract class P2LAlgorithm[PD, M: ClassTag, Q: ClassTag, P]
-  extends BaseAlgorithm[PD, M, Q, P] {
-
-  def trainBase(sc: SparkContext, pd: PD): M = train(sc, pd)
-
-  /** Implement this method to produce a model from prepared data.
-    *
-    * @param pd Prepared data for model training.
-    * @return Trained model.
-    */
-  def train(sc: SparkContext, pd: PD): M
-
-  def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)])
-  : RDD[(Long, P)] = batchPredict(bm.asInstanceOf[M], qs)
-
-  /** This is a default implementation to perform batch prediction. Override
-    * this method for a custom implementation.
-    *
-    * @param m A model
-    * @param qs An RDD of index-query tuples. The index is used to keep track 
of
-    *           predicted results with corresponding queries.
-    * @return Batch of predicted results
-    */
-  def batchPredict(m: M, qs: RDD[(Long, Q)]): RDD[(Long, P)] = {
-    qs.mapValues { q => predict(m, q) }
-  }
-
-  def predictBase(bm: Any, q: Q): P = predict(bm.asInstanceOf[M], q)
-
-  /** Implement this method to produce a prediction from a query and trained
-    * model.
-    *
-    * @param model Trained model produced by [[train]].
-    * @param query An input query.
-    * @return A prediction.
-    */
-  def predict(model: M, query: Q): P
-
-  /** :: DeveloperApi ::
-    * Engine developers should not use this directly (read on to see how
-    * parallel-to-local algorithm models are persisted).
-    *
-    * Parallel-to-local algorithms produce local models. By default, models 
will be
-    * serialized and stored automatically. Engine developers can override this 
behavior by
-    * mixing the [[PersistentModel]] trait into the model class, and
-    * PredictionIO will call [[PersistentModel.save]] instead. If it returns
-    * true, a [[io.prediction.workflow.PersistentModelManifest]] will be
-    * returned so that during deployment, PredictionIO will use
-    * [[PersistentModelLoader]] to retrieve the model. Otherwise, Unit will be
-    * returned and the model will be re-trained on-the-fly.
-    *
-    * @param sc Spark context
-    * @param modelId Model ID
-    * @param algoParams Algorithm parameters that trained this model
-    * @param bm Model
-    * @return The model itself for automatic persistence, an instance of
-    *         [[io.prediction.workflow.PersistentModelManifest]] for manual
-    *         persistence, or Unit for re-training on deployment
-    */
-  @DeveloperApi
-  override
-  def makePersistentModel(
-    sc: SparkContext,
-    modelId: String,
-    algoParams: Params,
-    bm: Any): Any = {
-    val m = bm.asInstanceOf[M]
-    if (m.isInstanceOf[PersistentModel[_]]) {
-      if (m.asInstanceOf[PersistentModel[Params]].save(
-        modelId, algoParams, sc)) {
-        PersistentModelManifest(className = m.getClass.getName)
-      } else {
-        Unit
-      }
-    } else {
-      m
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/PAlgorithm.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/controller/PAlgorithm.scala 
b/core/src/main/scala/io/prediction/controller/PAlgorithm.scala
deleted file mode 100644
index e9916be..0000000
--- a/core/src/main/scala/io/prediction/controller/PAlgorithm.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import io.prediction.annotation.DeveloperApi
-import io.prediction.core.BaseAlgorithm
-import io.prediction.workflow.PersistentModelManifest
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-
-/** Base class of a parallel algorithm.
-  *
-  * A parallel algorithm can be run in parallel on a cluster and produces a
-  * model that can also be distributed across a cluster.
-  *
-  * If your input query class requires custom JSON4S serialization, the most
-  * idiomatic way is to implement a trait that extends 
[[CustomQuerySerializer]],
-  * and mix that into your algorithm class, instead of overriding
-  * [[querySerializer]] directly.
-  *
-  * To provide evaluation feature, one must override and implement the
-  * [[batchPredict]] method. Otherwise, an exception will be thrown when pio 
eval`
-  * is used.
-  *
-  * @tparam PD Prepared data class.
-  * @tparam M Trained model class.
-  * @tparam Q Input query class.
-  * @tparam P Output prediction class.
-  * @group Algorithm
-  */
-abstract class PAlgorithm[PD, M, Q, P]
-  extends BaseAlgorithm[PD, M, Q, P] {
-
-  def trainBase(sc: SparkContext, pd: PD): M = train(sc, pd)
-
-  /** Implement this method to produce a model from prepared data.
-    *
-    * @param pd Prepared data for model training.
-    * @return Trained model.
-    */
-  def train(sc: SparkContext, pd: PD): M
-
-  def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)])
-  : RDD[(Long, P)] = batchPredict(bm.asInstanceOf[M], qs)
-
-  /** To provide evaluation feature, one must override and implement this 
method
-    * to generate many predictions in batch. Otherwise, an exception will be
-    * thrown when `pio eval` is used.
-    *
-    * The default implementation throws an exception.
-    *
-    * @param m Trained model produced by [[train]].
-    * @param qs An RDD of index-query tuples. The index is used to keep track 
of
-    *           predicted results with corresponding queries.
-    */
-  def batchPredict(m: M, qs: RDD[(Long, Q)]): RDD[(Long, P)] =
-    throw new NotImplementedError("batchPredict not implemented")
-
-  def predictBase(baseModel: Any, query: Q): P = {
-    predict(baseModel.asInstanceOf[M], query)
-  }
-
-  /** Implement this method to produce a prediction from a query and trained
-    * model.
-    *
-    * @param model Trained model produced by [[train]].
-    * @param query An input query.
-    * @return A prediction.
-    */
-  def predict(model: M, query: Q): P
-
-  /** :: DeveloperApi ::
-    * Engine developers should not use this directly (read on to see how 
parallel
-    * algorithm models are persisted).
-    *
-    * In general, parallel models may contain multiple RDDs. It is not easy to
-    * infer and persist them programmatically since these RDDs may be
-    * potentially huge. To persist these models, engine developers need to  mix
-    * the [[PersistentModel]] trait into the model class and implement
-    * [[PersistentModel.save]]. If it returns true, a
-    * [[io.prediction.workflow.PersistentModelManifest]] will be
-    * returned so that during deployment, PredictionIO will use
-    * [[PersistentModelLoader]] to retrieve the model. Otherwise, Unit will be
-    * returned and the model will be re-trained on-the-fly.
-    *
-    * @param sc Spark context
-    * @param modelId Model ID
-    * @param algoParams Algorithm parameters that trained this model
-    * @param bm Model
-    * @return The model itself for automatic persistence, an instance of
-    *         [[io.prediction.workflow.PersistentModelManifest]] for manual
-    *         persistence, or Unit for re-training on deployment
-    */
-  @DeveloperApi
-  override
-  def makePersistentModel(
-    sc: SparkContext,
-    modelId: String,
-    algoParams: Params,
-    bm: Any): Any = {
-    val m = bm.asInstanceOf[M]
-    if (m.isInstanceOf[PersistentModel[_]]) {
-      if (m.asInstanceOf[PersistentModel[Params]].save(
-        modelId, algoParams, sc)) {
-        PersistentModelManifest(className = m.getClass.getName)
-      } else {
-        Unit
-      }
-    } else {
-      Unit
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/PDataSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/controller/PDataSource.scala 
b/core/src/main/scala/io/prediction/controller/PDataSource.scala
deleted file mode 100644
index 55a2cf9..0000000
--- a/core/src/main/scala/io/prediction/controller/PDataSource.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import io.prediction.core.BaseDataSource
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-
-/** Base class of a parallel data source.
-  *
-  * A parallel data source runs locally within a single machine, or in parallel
-  * on a cluster, to return data that is distributed across a cluster.
-  *
-  * @tparam TD Training data class.
-  * @tparam EI Evaluation Info class.
-  * @tparam Q Input query class.
-  * @tparam A Actual value class.
-  * @group Data Source
-  */
-
-abstract class PDataSource[TD, EI, Q, A]
-  extends BaseDataSource[TD, EI, Q, A] {
-
-  def readTrainingBase(sc: SparkContext): TD = readTraining(sc)
-
-  /** Implement this method to only return training data from a data source */
-  def readTraining(sc: SparkContext): TD
-
-  def readEvalBase(sc: SparkContext): Seq[(TD, EI, RDD[(Q, A)])] = readEval(sc)
-
-  /** To provide evaluation feature for your engine, your must override this
-    * method to return data for evaluation from a data source. Returned data 
can
-    * optionally include a sequence of query and actual value pairs for
-    * evaluation purpose.
-    *
-    * The default implementation returns an empty sequence as a stub, so that
-    * an engine can be compiled without implementing evaluation.
-    */
-  def readEval(sc: SparkContext): Seq[(TD, EI, RDD[(Q, A)])] =
-    Seq[(TD, EI, RDD[(Q, A)])]()
-
-  @deprecated("Use readEval() instead.", "0.9.0")
-  def read(sc: SparkContext): Seq[(TD, EI, RDD[(Q, A)])] = readEval(sc)
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/PPreparator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/controller/PPreparator.scala 
b/core/src/main/scala/io/prediction/controller/PPreparator.scala
deleted file mode 100644
index 154560b..0000000
--- a/core/src/main/scala/io/prediction/controller/PPreparator.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import io.prediction.core.BasePreparator
-import org.apache.spark.SparkContext
-
-/** Base class of a parallel preparator.
-  *
-  * A parallel preparator can be run in parallel on a cluster and produces a
-  * prepared data that is distributed across a cluster.
-  *
-  * @tparam TD Training data class.
-  * @tparam PD Prepared data class.
-  * @group Preparator
-  */
-abstract class PPreparator[TD, PD]
-  extends BasePreparator[TD, PD] {
-
-  def prepareBase(sc: SparkContext, td: TD): PD = {
-    prepare(sc, td)
-  }
-
-  /** Implement this method to produce prepared data that is ready for model
-    * training.
-    *
-    * @param sc An Apache Spark context.
-    * @param trainingData Training data to be prepared.
-    */
-  def prepare(sc: SparkContext, trainingData: TD): PD
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/Params.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/controller/Params.scala 
b/core/src/main/scala/io/prediction/controller/Params.scala
deleted file mode 100644
index 0d5d149..0000000
--- a/core/src/main/scala/io/prediction/controller/Params.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-/** Base trait for all kinds of parameters that will be passed to constructors
-  * of different controller classes.
-  *
-  * @group Helper
-  */
-trait Params extends Serializable {}
-
-/** A concrete implementation of [[Params]] representing empty parameters.
-  *
-  * @group Helper
-  */
-case class EmptyParams() extends Params {
-  override def toString(): String = "Empty"
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/PersistentModel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/controller/PersistentModel.scala 
b/core/src/main/scala/io/prediction/controller/PersistentModel.scala
deleted file mode 100644
index 5d0ec41..0000000
--- a/core/src/main/scala/io/prediction/controller/PersistentModel.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import org.apache.spark.SparkContext
-
-/** Mix in and implement this trait if your model cannot be persisted by
-  * PredictionIO automatically. A companion object extending
-  * IPersistentModelLoader is required for PredictionIO to load the persisted
-  * model automatically during deployment.
-  *
-  * Notice that models generated by [[PAlgorithm]] cannot be persisted
-  * automatically by nature and must implement these traits if model 
persistence
-  * is desired.
-  *
-  * {{{
-  * class MyModel extends PersistentModel[MyParams] {
-  *   def save(id: String, params: MyParams, sc: SparkContext): Boolean = {
-  *     ...
-  *   }
-  * }
-  *
-  * object MyModel extends PersistentModelLoader[MyParams, MyModel] {
-  *   def apply(id: String, params: MyParams, sc: Option[SparkContext]): 
MyModel = {
-  *     ...
-  *   }
-  * }
-  * }}}
-  *
-  * In Java, all you need to do is to implement this interface, and add a 
static
-  * method with 3 arguments of type String, [[Params]], and SparkContext.
-  *
-  * {{{
-  * public class MyModel implements PersistentModel<MyParams>, Serializable {
-  *   ...
-  *   public boolean save(String id, MyParams params, SparkContext sc) {
-  *     ...
-  *   }
-  *
-  *   public static MyModel load(String id, Params params, SparkContext sc) {
-  *     ...
-  *   }
-  *   ...
-  * }
-  * }}}
-  *
-  * @tparam AP Algorithm parameters class.
-  * @see [[PersistentModelLoader]]
-  * @group Algorithm
-  */
-trait PersistentModel[AP <: Params] {
-  /** Save the model to some persistent storage.
-    *
-    * This method should return true if the model has been saved successfully 
so
-    * that PredictionIO knows that it can be restored later during deployment.
-    * This method should return false if the model cannot be saved (or should
-    * not be saved due to configuration) so that PredictionIO will re-train the
-    * model during deployment. All arguments of this method are provided by
-    * automatically by PredictionIO.
-    *
-    * @param id ID of the run that trained this model.
-    * @param params Algorithm parameters that were used to train this model.
-    * @param sc An Apache Spark context.
-    */
-  def save(id: String, params: AP, sc: SparkContext): Boolean
-}
-
-/** Implement an object that extends this trait for PredictionIO to support
-  * loading a persisted model during serving deployment.
-  *
-  * @tparam AP Algorithm parameters class.
-  * @tparam M Model class.
-  * @see [[PersistentModel]]
-  * @group Algorithm
-  */
-trait PersistentModelLoader[AP <: Params, M] {
-  /** Implement this method to restore a persisted model that extends the
-    * [[PersistentModel]] trait. All arguments of this method are provided
-    * automatically by PredictionIO.
-    *
-    * @param id ID of the run that trained this model.
-    * @param params Algorithm parameters that were used to train this model.
-    * @param sc An optional Apache Spark context. This will be injected if the
-    *           model was generated by a [[PAlgorithm]].
-    */
-  def apply(id: String, params: AP, sc: Option[SparkContext]): M
-}
-
-/** DEPRECATED. Use [[PersistentModel]] instead.
-  *
-  * @group Algorithm */
-@deprecated("Use PersistentModel instead.", "0.9.2")
-trait IPersistentModel[AP <: Params] extends PersistentModel[AP]
-
-/** DEPRECATED. Use [[PersistentModelLoader]] instead.
-  *
-  * @group Algorithm */
-@deprecated("Use PersistentModelLoader instead.", "0.9.2")
-trait IPersistentModelLoader[AP <: Params, M] extends 
PersistentModelLoader[AP, M]

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/SanityCheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/controller/SanityCheck.scala 
b/core/src/main/scala/io/prediction/controller/SanityCheck.scala
deleted file mode 100644
index bb5342f..0000000
--- a/core/src/main/scala/io/prediction/controller/SanityCheck.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package io.prediction.controller
-
-/** Extends a data class with this trait if you want PredictionIO to
-  * automatically perform sanity check on your data classes during training.
-  * This is very useful when you need to debug your engine.
-  *
-  * @group Helper
-  */
-trait SanityCheck {
-  /** Implement this method to perform checks on your data. This method should
-    * contain assertions that throw exceptions when your data does not meet
-    * your pre-defined requirement.
-    */
-  def sanityCheck(): Unit
-}


Reply via email to