http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/Metric.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/Metric.scala 
b/core/src/main/scala/org/apache/predictionio/controller/Metric.scala
new file mode 100644
index 0000000..cc27984
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/Metric.scala
@@ -0,0 +1,266 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import _root_.org.apache.predictionio.controller.java.SerializableComparator
+import org.apache.predictionio.core.BaseEngine
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.StatCounter
+
+import scala.Numeric.Implicits._
+import scala.reflect._
+
+/** Base class of a [[Metric]].
+  *
+  * @tparam EI Evaluation information
+  * @tparam Q Query
+  * @tparam P Predicted result
+  * @tparam A Actual result
+  * @tparam R Metric result
+  * @group Evaluation
+  */
+abstract class Metric[EI, Q, P, A, R](implicit rOrder: Ordering[R])
+extends Serializable {
+  /** Java friendly constructor
+    *
+    * @param comparator A serializable comparator for sorting the metric 
results.
+    *
+    */
+  def this(comparator: SerializableComparator[R]) = {
+    this()(Ordering.comparatorToOrdering(comparator))
+  }
+
+  /** Class name of this [[Metric]]. */
+  def header: String = this.getClass.getSimpleName
+
+  /** Calculates the result of this [[Metric]]. */
+  def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]): R
+
+  /** Comparison function for R's ordering. */
+  def compare(r0: R, r1: R): Int = rOrder.compare(r0, r1)
+}
+
+private [prediction] trait StatsMetricHelper[EI, Q, P, A] {
+  def calculate(q: Q, p: P, a: A): Double
+
+  def calculateStats(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])])
+  : StatCounter = {
+    val doubleRDD = sc.union(
+      evalDataSet.map { case (_, qpaRDD) =>
+        qpaRDD.map { case (q, p, a) => calculate(q, p, a) }
+      }
+    )
+
+    doubleRDD.stats()
+  }
+}
+
+private [prediction] trait StatsOptionMetricHelper[EI, Q, P, A] {
+  def calculate(q: Q, p: P, a: A): Option[Double]
+
+  def calculateStats(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])])
+  : StatCounter = {
+    val doubleRDD = sc.union(
+      evalDataSet.map { case (_, qpaRDD) =>
+        qpaRDD.flatMap { case (q, p, a) => calculate(q, p, a) }
+      }
+    )
+
+    doubleRDD.stats()
+  }
+}
+
+/** Returns the global average of the score returned by the calculate method.
+  *
+  * @tparam EI Evaluation information
+  * @tparam Q Query
+  * @tparam P Predicted result
+  * @tparam A Actual result
+  *
+  * @group Evaluation
+  */
+abstract class AverageMetric[EI, Q, P, A]
+    extends Metric[EI, Q, P, A, Double]
+    with StatsMetricHelper[EI, Q, P, A]
+    with QPAMetric[Q, P, A, Double] {
+  /** Implement this method to return a score that will be used for averaging
+    * across all QPA tuples.
+    */
+  def calculate(q: Q, p: P, a: A): Double
+
+  def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])])
+  : Double = {
+    calculateStats(sc, evalDataSet).mean
+  }
+}
+
+/** Returns the global average of the non-None score returned by the calculate
+  * method.
+  *
+  * @tparam EI Evaluation information
+  * @tparam Q Query
+  * @tparam P Predicted result
+  * @tparam A Actual result
+  *
+  * @group Evaluation
+  */
+abstract class OptionAverageMetric[EI, Q, P, A]
+    extends Metric[EI, Q, P, A, Double]
+    with StatsOptionMetricHelper[EI, Q, P, A]
+    with QPAMetric[Q, P, A, Option[Double]] {
+  /** Implement this method to return a score that will be used for averaging
+    * across all QPA tuples.
+    */
+  def calculate(q: Q, p: P, a: A): Option[Double]
+
+  def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])])
+  : Double = {
+    calculateStats(sc, evalDataSet).mean
+  }
+}
+
+/** Returns the global standard deviation of the score returned by the 
calculate method
+  *
+  * This method uses org.apache.spark.util.StatCounter library, a one pass
+  * method is used for calculation
+  *
+  * @tparam EI Evaluation information
+  * @tparam Q Query
+  * @tparam P Predicted result
+  * @tparam A Actual result
+  *
+  * @group Evaluation
+  */
+abstract class StdevMetric[EI, Q, P, A]
+    extends Metric[EI, Q, P, A, Double]
+    with StatsMetricHelper[EI, Q, P, A]
+    with QPAMetric[Q, P, A, Double] {
+  /** Implement this method to return a score that will be used for calculating
+    * the stdev
+    * across all QPA tuples.
+    */
+  def calculate(q: Q, p: P, a: A): Double
+
+  def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])])
+  : Double = {
+    calculateStats(sc, evalDataSet).stdev
+  }
+}
+
+/** Returns the global standard deviation of the non-None score returned by 
the calculate method
+  *
+  * This method uses org.apache.spark.util.StatCounter library, a one pass
+  * method is used for calculation
+  *
+  * @tparam EI Evaluation information
+  * @tparam Q Query
+  * @tparam P Predicted result
+  * @tparam A Actual result
+  *
+  * @group Evaluation
+  */
+abstract class OptionStdevMetric[EI, Q, P, A]
+    extends Metric[EI, Q, P, A, Double]
+    with StatsOptionMetricHelper[EI, Q, P, A]
+    with QPAMetric[Q, P, A, Option[Double]] {
+  /** Implement this method to return a score that will be used for calculating
+    * the stdev
+    * across all QPA tuples.
+    */
+  def calculate(q: Q, p: P, a: A): Option[Double]
+
+  def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])])
+  : Double = {
+    calculateStats(sc, evalDataSet).stdev
+  }
+}
+
+/** Returns the sum of the score returned by the calculate method.
+  *
+  * @tparam EI Evaluation information
+  * @tparam Q Query
+  * @tparam P Predicted result
+  * @tparam A Actual result
+  * @tparam R Result, output of the function calculate, must be Numeric
+  *
+  * @group Evaluation
+  */
+abstract class SumMetric[EI, Q, P, A, R: ClassTag](implicit num: Numeric[R])
+    extends Metric[EI, Q, P, A, R]()(num)
+    with QPAMetric[Q, P, A, R] {
+  /** Implement this method to return a score that will be used for summing
+    * across all QPA tuples.
+    */
+  def calculate(q: Q, p: P, a: A): R
+
+  def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])])
+  : R = {
+    val union: RDD[R] = sc.union(
+      evalDataSet.map { case (_, qpaRDD) =>
+        qpaRDD.map { case (q, p, a) => calculate(q, p, a) }
+      }
+    )
+
+    union.aggregate[R](num.zero)(_ + _, _ + _)
+  }
+}
+
+/** Returns zero. Useful as a placeholder during evaluation development when 
not all components are
+  * implemented.
+  * @tparam EI Evaluation information
+  * @tparam Q Query
+  * @tparam P Predicted result
+  * @tparam A Actual result
+  *
+  * @group Evaluation
+  */
+class ZeroMetric[EI, Q, P, A] extends Metric[EI, Q, P, A, Double]() {
+   def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]): 
Double = 0.0
+}
+
+/** Companion object of [[ZeroMetric]]
+  *
+  * @group Evaluation
+  */
+object ZeroMetric {
+  /** Returns a ZeroMetric instance using Engine's type parameters. */
+  def apply[EI, Q, P, A](engine: BaseEngine[EI, Q, P, A]): ZeroMetric[EI, Q, 
P, A] = {
+    new ZeroMetric[EI, Q, P, A]()
+  }
+}
+
+
+/** Trait for metric which returns a score based on Query, PredictedResult,
+  * and ActualResult
+  *
+  * @tparam Q Query class
+  * @tparam P Predicted result class
+  * @tparam A Actual result class
+  * @tparam R Metric result class
+  * @group Evaluation
+  */
+trait QPAMetric[Q, P, A, R] {
+  /** Calculate a metric result based on query, predicted result, and actual
+    * result
+    *
+    * @param q Query
+    * @param p Predicted result
+    * @param a Actual result
+    * @return Metric result
+    */
+  def calculate(q: Q, p: P, a: A): R
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/MetricEvaluator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/MetricEvaluator.scala 
b/core/src/main/scala/org/apache/predictionio/controller/MetricEvaluator.scala
new file mode 100644
index 0000000..b707041
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/predictionio/controller/MetricEvaluator.scala
@@ -0,0 +1,260 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import _root_.java.io.File
+import _root_.java.io.PrintWriter
+
+import com.github.nscala_time.time.Imports.DateTime
+import grizzled.slf4j.Logger
+import org.apache.predictionio.annotation.DeveloperApi
+import org.apache.predictionio.core.BaseEvaluator
+import org.apache.predictionio.core.BaseEvaluatorResult
+import org.apache.predictionio.data.storage.Storage
+import org.apache.predictionio.workflow.JsonExtractor
+import org.apache.predictionio.workflow.JsonExtractorOption.Both
+import org.apache.predictionio.workflow.NameParamsSerializer
+import org.apache.predictionio.workflow.WorkflowParams
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.json4s.native.Serialization.write
+import org.json4s.native.Serialization.writePretty
+
+import scala.language.existentials
+
+/** Case class storing a primary score, and other scores
+  *
+  * @param score Primary metric score
+  * @param otherScores Other scores this metric might have
+  * @tparam R Type of the primary metric score
+  * @group Evaluation
+  */
+case class MetricScores[R](
+  score: R,
+  otherScores: Seq[Any])
+
+/** Contains all results of a [[MetricEvaluator]]
+  *
+  * @param bestScore The best score among all iterations
+  * @param bestEngineParams The set of engine parameters that yielded the best 
score
+  * @param bestIdx The index of iteration that yielded the best score
+  * @param metricHeader Brief description of the primary metric score
+  * @param otherMetricHeaders Brief descriptions of other metric scores
+  * @param engineParamsScores All sets of engine parameters and corresponding 
metric scores
+  * @param outputPath An optional output path where scores are saved
+  * @tparam R Type of the primary metric score
+  * @group Evaluation
+  */
+case class MetricEvaluatorResult[R](
+  bestScore: MetricScores[R],
+  bestEngineParams: EngineParams,
+  bestIdx: Int,
+  metricHeader: String,
+  otherMetricHeaders: Seq[String],
+  engineParamsScores: Seq[(EngineParams, MetricScores[R])],
+  outputPath: Option[String])
+extends BaseEvaluatorResult {
+
+  override def toOneLiner(): String = {
+    val idx = engineParamsScores.map(_._1).indexOf(bestEngineParams)
+    s"Best Params Index: $idx Score: ${bestScore.score}"
+  }
+
+  override def toJSON(): String = {
+    implicit lazy val formats = Utils.json4sDefaultFormats +
+      new NameParamsSerializer
+    write(this)
+  }
+
+  override def toHTML(): String = html.metric_evaluator().toString()
+
+  override def toString: String = {
+    implicit lazy val formats = Utils.json4sDefaultFormats +
+      new NameParamsSerializer
+
+    val bestEPStr = JsonExtractor.engineParamstoPrettyJson(Both, 
bestEngineParams)
+
+    val strings = Seq(
+      "MetricEvaluatorResult:",
+      s"  # engine params evaluated: ${engineParamsScores.size}") ++
+      Seq(
+        "Optimal Engine Params:",
+        s"  $bestEPStr",
+        "Metrics:",
+        s"  $metricHeader: ${bestScore.score}") ++
+      otherMetricHeaders.zip(bestScore.otherScores).map {
+        case (h, s) => s"  $h: $s"
+      } ++
+      outputPath.toSeq.map {
+        p => s"The best variant params can be found in $p"
+      }
+
+    strings.mkString("\n")
+  }
+}
+
+/** Companion object of [[MetricEvaluator]]
+  *
+  * @group Evaluation
+  */
+object MetricEvaluator {
+  def apply[EI, Q, P, A, R](
+    metric: Metric[EI, Q, P, A, R],
+    otherMetrics: Seq[Metric[EI, Q, P, A, _]],
+    outputPath: String): MetricEvaluator[EI, Q, P, A, R] = {
+    new MetricEvaluator[EI, Q, P, A, R](
+      metric,
+      otherMetrics,
+      Some(outputPath))
+  }
+
+  def apply[EI, Q, P, A, R](
+    metric: Metric[EI, Q, P, A, R],
+    otherMetrics: Seq[Metric[EI, Q, P, A, _]])
+  : MetricEvaluator[EI, Q, P, A, R] = {
+    new MetricEvaluator[EI, Q, P, A, R](
+      metric,
+      otherMetrics,
+      None)
+  }
+
+  def apply[EI, Q, P, A, R](metric: Metric[EI, Q, P, A, R])
+  : MetricEvaluator[EI, Q, P, A, R] = {
+    new MetricEvaluator[EI, Q, P, A, R](
+      metric,
+      Seq[Metric[EI, Q, P, A, _]](),
+      None)
+  }
+
+  case class NameParams(name: String, params: Params) {
+    def this(np: (String, Params)) = this(np._1, np._2)
+  }
+
+  case class EngineVariant(
+    id: String,
+    description: String,
+    engineFactory: String,
+    datasource: NameParams,
+    preparator: NameParams,
+    algorithms: Seq[NameParams],
+    serving: NameParams) {
+
+    def this(evaluation: Evaluation, engineParams: EngineParams) = this(
+      id = "",
+      description = "",
+      engineFactory = evaluation.getClass.getName,
+      datasource = new NameParams(engineParams.dataSourceParams),
+      preparator = new NameParams(engineParams.preparatorParams),
+      algorithms = engineParams.algorithmParamsList.map(np => new 
NameParams(np)),
+      serving = new NameParams(engineParams.servingParams))
+  }
+}
+
+/** :: DeveloperApi ::
+  * Do no use this directly. Use [[MetricEvaluator$]] instead. This is an
+  * implementation of [[org.apache.predictionio.core.BaseEvaluator]] that 
evaluates
+  * prediction performance based on metric scores.
+  *
+  * @param metric Primary metric
+  * @param otherMetrics Other metrics
+  * @param outputPath Optional output path to save evaluation results
+  * @tparam EI Evaluation information type
+  * @tparam Q Query class
+  * @tparam P Predicted result class
+  * @tparam A Actual result class
+  * @tparam R Metric result class
+  * @group Evaluation
+  */
+@DeveloperApi
+class MetricEvaluator[EI, Q, P, A, R] (
+  val metric: Metric[EI, Q, P, A, R],
+  val otherMetrics: Seq[Metric[EI, Q, P, A, _]],
+  val outputPath: Option[String])
+  extends BaseEvaluator[EI, Q, P, A, MetricEvaluatorResult[R]] {
+  @transient lazy val logger = Logger[this.type]
+  @transient val engineInstances = Storage.getMetaDataEngineInstances()
+
+  def saveEngineJson(
+    evaluation: Evaluation,
+    engineParams: EngineParams,
+    outputPath: String) {
+
+    val now = DateTime.now
+    val evalClassName = evaluation.getClass.getName
+
+    val variant = MetricEvaluator.EngineVariant(
+      id = s"$evalClassName $now",
+      description = "",
+      engineFactory = evalClassName,
+      datasource = new 
MetricEvaluator.NameParams(engineParams.dataSourceParams),
+      preparator = new 
MetricEvaluator.NameParams(engineParams.preparatorParams),
+      algorithms = engineParams.algorithmParamsList.map(np => new 
MetricEvaluator.NameParams(np)),
+      serving = new MetricEvaluator.NameParams(engineParams.servingParams))
+
+    implicit lazy val formats = Utils.json4sDefaultFormats
+
+    logger.info(s"Writing best variant params to disk ($outputPath)...")
+    val writer = new PrintWriter(new File(outputPath))
+    writer.write(writePretty(variant))
+    writer.close()
+  }
+
+  def evaluateBase(
+    sc: SparkContext,
+    evaluation: Evaluation,
+    engineEvalDataSet: Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])],
+    params: WorkflowParams): MetricEvaluatorResult[R] = {
+
+    val evalResultList: Seq[(EngineParams, MetricScores[R])] = 
engineEvalDataSet
+    .zipWithIndex
+    .par
+    .map { case ((engineParams, evalDataSet), idx) =>
+      val metricScores = MetricScores[R](
+        metric.calculate(sc, evalDataSet),
+        otherMetrics.map(_.calculate(sc, evalDataSet)))
+      (engineParams, metricScores)
+    }
+    .seq
+
+    implicit lazy val formats = Utils.json4sDefaultFormats +
+      new NameParamsSerializer
+
+    evalResultList.zipWithIndex.foreach { case ((ep, r), idx) =>
+      logger.info(s"Iteration $idx")
+      logger.info(s"EngineParams: ${JsonExtractor.engineParamsToJson(Both, 
ep)}")
+      logger.info(s"Result: $r")
+    }
+
+    // use max. take implicit from Metric.
+    val ((bestEngineParams, bestScore), bestIdx) = evalResultList
+    .zipWithIndex
+    .reduce { (x, y) =>
+      if (metric.compare(x._1._2.score, y._1._2.score) >= 0) x else y
+    }
+
+    // save engine params if it is set.
+    outputPath.foreach { path => saveEngineJson(evaluation, bestEngineParams, 
path) }
+
+    MetricEvaluatorResult(
+      bestScore = bestScore,
+      bestEngineParams = bestEngineParams,
+      bestIdx = bestIdx,
+      metricHeader = metric.header,
+      otherMetricHeaders = otherMetrics.map(_.header),
+      engineParamsScores = evalResultList,
+      outputPath = outputPath)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala 
b/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala
new file mode 100644
index 0000000..cb9f7c4
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala
@@ -0,0 +1,121 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import _root_.org.apache.predictionio.annotation.DeveloperApi
+import org.apache.predictionio.core.BaseAlgorithm
+import org.apache.predictionio.workflow.PersistentModelManifest
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import scala.reflect._
+
+/** Base class of a parallel-to-local algorithm.
+  *
+  * A parallel-to-local algorithm can be run in parallel on a cluster and
+  * produces a model that can fit within a single machine.
+  *
+  * If your input query class requires custom JSON4S serialization, the most
+  * idiomatic way is to implement a trait that extends 
[[CustomQuerySerializer]],
+  * and mix that into your algorithm class, instead of overriding
+  * [[querySerializer]] directly.
+  *
+  * @tparam PD Prepared data class.
+  * @tparam M Trained model class.
+  * @tparam Q Input query class.
+  * @tparam P Output prediction class.
+  * @group Algorithm
+  */
+abstract class P2LAlgorithm[PD, M: ClassTag, Q: ClassTag, P]
+  extends BaseAlgorithm[PD, M, Q, P] {
+
+  def trainBase(sc: SparkContext, pd: PD): M = train(sc, pd)
+
+  /** Implement this method to produce a model from prepared data.
+    *
+    * @param pd Prepared data for model training.
+    * @return Trained model.
+    */
+  def train(sc: SparkContext, pd: PD): M
+
+  def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)])
+  : RDD[(Long, P)] = batchPredict(bm.asInstanceOf[M], qs)
+
+  /** This is a default implementation to perform batch prediction. Override
+    * this method for a custom implementation.
+    *
+    * @param m A model
+    * @param qs An RDD of index-query tuples. The index is used to keep track 
of
+    *           predicted results with corresponding queries.
+    * @return Batch of predicted results
+    */
+  def batchPredict(m: M, qs: RDD[(Long, Q)]): RDD[(Long, P)] = {
+    qs.mapValues { q => predict(m, q) }
+  }
+
+  def predictBase(bm: Any, q: Q): P = predict(bm.asInstanceOf[M], q)
+
+  /** Implement this method to produce a prediction from a query and trained
+    * model.
+    *
+    * @param model Trained model produced by [[train]].
+    * @param query An input query.
+    * @return A prediction.
+    */
+  def predict(model: M, query: Q): P
+
+  /** :: DeveloperApi ::
+    * Engine developers should not use this directly (read on to see how
+    * parallel-to-local algorithm models are persisted).
+    *
+    * Parallel-to-local algorithms produce local models. By default, models 
will be
+    * serialized and stored automatically. Engine developers can override this 
behavior by
+    * mixing the [[PersistentModel]] trait into the model class, and
+    * PredictionIO will call [[PersistentModel.save]] instead. If it returns
+    * true, a [[org.apache.predictionio.workflow.PersistentModelManifest]] 
will be
+    * returned so that during deployment, PredictionIO will use
+    * [[PersistentModelLoader]] to retrieve the model. Otherwise, Unit will be
+    * returned and the model will be re-trained on-the-fly.
+    *
+    * @param sc Spark context
+    * @param modelId Model ID
+    * @param algoParams Algorithm parameters that trained this model
+    * @param bm Model
+    * @return The model itself for automatic persistence, an instance of
+    *         [[org.apache.predictionio.workflow.PersistentModelManifest]] for 
manual
+    *         persistence, or Unit for re-training on deployment
+    */
+  @DeveloperApi
+  override
+  def makePersistentModel(
+    sc: SparkContext,
+    modelId: String,
+    algoParams: Params,
+    bm: Any): Any = {
+    val m = bm.asInstanceOf[M]
+    if (m.isInstanceOf[PersistentModel[_]]) {
+      if (m.asInstanceOf[PersistentModel[Params]].save(
+        modelId, algoParams, sc)) {
+        PersistentModelManifest(className = m.getClass.getName)
+      } else {
+        Unit
+      }
+    } else {
+      m
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala 
b/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala
new file mode 100644
index 0000000..d2123f3
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala
@@ -0,0 +1,126 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.annotation.DeveloperApi
+import org.apache.predictionio.core.BaseAlgorithm
+import org.apache.predictionio.workflow.PersistentModelManifest
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+/** Base class of a parallel algorithm.
+  *
+  * A parallel algorithm can be run in parallel on a cluster and produces a
+  * model that can also be distributed across a cluster.
+  *
+  * If your input query class requires custom JSON4S serialization, the most
+  * idiomatic way is to implement a trait that extends 
[[CustomQuerySerializer]],
+  * and mix that into your algorithm class, instead of overriding
+  * [[querySerializer]] directly.
+  *
+  * To provide evaluation feature, one must override and implement the
+  * [[batchPredict]] method. Otherwise, an exception will be thrown when pio 
eval`
+  * is used.
+  *
+  * @tparam PD Prepared data class.
+  * @tparam M Trained model class.
+  * @tparam Q Input query class.
+  * @tparam P Output prediction class.
+  * @group Algorithm
+  */
+abstract class PAlgorithm[PD, M, Q, P]
+  extends BaseAlgorithm[PD, M, Q, P] {
+
+  def trainBase(sc: SparkContext, pd: PD): M = train(sc, pd)
+
+  /** Implement this method to produce a model from prepared data.
+    *
+    * @param pd Prepared data for model training.
+    * @return Trained model.
+    */
+  def train(sc: SparkContext, pd: PD): M
+
+  def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)])
+  : RDD[(Long, P)] = batchPredict(bm.asInstanceOf[M], qs)
+
+  /** To provide evaluation feature, one must override and implement this 
method
+    * to generate many predictions in batch. Otherwise, an exception will be
+    * thrown when `pio eval` is used.
+    *
+    * The default implementation throws an exception.
+    *
+    * @param m Trained model produced by [[train]].
+    * @param qs An RDD of index-query tuples. The index is used to keep track 
of
+    *           predicted results with corresponding queries.
+    */
+  def batchPredict(m: M, qs: RDD[(Long, Q)]): RDD[(Long, P)] =
+    throw new NotImplementedError("batchPredict not implemented")
+
+  def predictBase(baseModel: Any, query: Q): P = {
+    predict(baseModel.asInstanceOf[M], query)
+  }
+
+  /** Implement this method to produce a prediction from a query and trained
+    * model.
+    *
+    * @param model Trained model produced by [[train]].
+    * @param query An input query.
+    * @return A prediction.
+    */
+  def predict(model: M, query: Q): P
+
+  /** :: DeveloperApi ::
+    * Engine developers should not use this directly (read on to see how 
parallel
+    * algorithm models are persisted).
+    *
+    * In general, parallel models may contain multiple RDDs. It is not easy to
+    * infer and persist them programmatically since these RDDs may be
+    * potentially huge. To persist these models, engine developers need to  mix
+    * the [[PersistentModel]] trait into the model class and implement
+    * [[PersistentModel.save]]. If it returns true, a
+    * [[org.apache.predictionio.workflow.PersistentModelManifest]] will be
+    * returned so that during deployment, PredictionIO will use
+    * [[PersistentModelLoader]] to retrieve the model. Otherwise, Unit will be
+    * returned and the model will be re-trained on-the-fly.
+    *
+    * @param sc Spark context
+    * @param modelId Model ID
+    * @param algoParams Algorithm parameters that trained this model
+    * @param bm Model
+    * @return The model itself for automatic persistence, an instance of
+    *         [[org.apache.predictionio.workflow.PersistentModelManifest]] for 
manual
+    *         persistence, or Unit for re-training on deployment
+    */
+  @DeveloperApi
+  override
+  def makePersistentModel(
+    sc: SparkContext,
+    modelId: String,
+    algoParams: Params,
+    bm: Any): Any = {
+    val m = bm.asInstanceOf[M]
+    if (m.isInstanceOf[PersistentModel[_]]) {
+      if (m.asInstanceOf[PersistentModel[Params]].save(
+        modelId, algoParams, sc)) {
+        PersistentModelManifest(className = m.getClass.getName)
+      } else {
+        Unit
+      }
+    } else {
+      Unit
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/PDataSource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/PDataSource.scala 
b/core/src/main/scala/org/apache/predictionio/controller/PDataSource.scala
new file mode 100644
index 0000000..e595dca
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/PDataSource.scala
@@ -0,0 +1,57 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BaseDataSource
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+/** Base class of a parallel data source.
+  *
+  * A parallel data source runs locally within a single machine, or in parallel
+  * on a cluster, to return data that is distributed across a cluster.
+  *
+  * @tparam TD Training data class.
+  * @tparam EI Evaluation Info class.
+  * @tparam Q Input query class.
+  * @tparam A Actual value class.
+  * @group Data Source
+  */
+
+abstract class PDataSource[TD, EI, Q, A]
+  extends BaseDataSource[TD, EI, Q, A] {
+
+  def readTrainingBase(sc: SparkContext): TD = readTraining(sc)
+
+  /** Implement this method to only return training data from a data source */
+  def readTraining(sc: SparkContext): TD
+
+  def readEvalBase(sc: SparkContext): Seq[(TD, EI, RDD[(Q, A)])] = readEval(sc)
+
+  /** To provide evaluation feature for your engine, your must override this
+    * method to return data for evaluation from a data source. Returned data 
can
+    * optionally include a sequence of query and actual value pairs for
+    * evaluation purpose.
+    *
+    * The default implementation returns an empty sequence as a stub, so that
+    * an engine can be compiled without implementing evaluation.
+    */
+  def readEval(sc: SparkContext): Seq[(TD, EI, RDD[(Q, A)])] =
+    Seq[(TD, EI, RDD[(Q, A)])]()
+
+  @deprecated("Use readEval() instead.", "0.9.0")
+  def read(sc: SparkContext): Seq[(TD, EI, RDD[(Q, A)])] = readEval(sc)
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/PPreparator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/PPreparator.scala 
b/core/src/main/scala/org/apache/predictionio/controller/PPreparator.scala
new file mode 100644
index 0000000..66f51e4
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/PPreparator.scala
@@ -0,0 +1,44 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.core.BasePreparator
+import org.apache.spark.SparkContext
+
+/** Base class of a parallel preparator.
+  *
+  * A parallel preparator can be run in parallel on a cluster and produces a
+  * prepared data that is distributed across a cluster.
+  *
+  * @tparam TD Training data class.
+  * @tparam PD Prepared data class.
+  * @group Preparator
+  */
+abstract class PPreparator[TD, PD]
+  extends BasePreparator[TD, PD] {
+
+  def prepareBase(sc: SparkContext, td: TD): PD = {
+    prepare(sc, td)
+  }
+
+  /** Implement this method to produce prepared data that is ready for model
+    * training.
+    *
+    * @param sc An Apache Spark context.
+    * @param trainingData Training data to be prepared.
+    */
+  def prepare(sc: SparkContext, trainingData: TD): PD
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/Params.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/Params.scala 
b/core/src/main/scala/org/apache/predictionio/controller/Params.scala
new file mode 100644
index 0000000..bdb3f7e
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/Params.scala
@@ -0,0 +1,31 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+/** Base trait for all kinds of parameters that will be passed to constructors
+  * of different controller classes.
+  *
+  * @group Helper
+  */
+trait Params extends Serializable {}
+
+/** A concrete implementation of [[Params]] representing empty parameters.
+  *
+  * @group Helper
+  */
+case class EmptyParams() extends Params {
+  override def toString(): String = "Empty"
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/PersistentModel.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/PersistentModel.scala 
b/core/src/main/scala/org/apache/predictionio/controller/PersistentModel.scala
new file mode 100644
index 0000000..fb8d57b
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/predictionio/controller/PersistentModel.scala
@@ -0,0 +1,112 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import org.apache.spark.SparkContext
+
+/** Mix in and implement this trait if your model cannot be persisted by
+  * PredictionIO automatically. A companion object extending
+  * IPersistentModelLoader is required for PredictionIO to load the persisted
+  * model automatically during deployment.
+  *
+  * Notice that models generated by [[PAlgorithm]] cannot be persisted
+  * automatically by nature and must implement these traits if model 
persistence
+  * is desired.
+  *
+  * {{{
+  * class MyModel extends PersistentModel[MyParams] {
+  *   def save(id: String, params: MyParams, sc: SparkContext): Boolean = {
+  *     ...
+  *   }
+  * }
+  *
+  * object MyModel extends PersistentModelLoader[MyParams, MyModel] {
+  *   def apply(id: String, params: MyParams, sc: Option[SparkContext]): 
MyModel = {
+  *     ...
+  *   }
+  * }
+  * }}}
+  *
+  * In Java, all you need to do is to implement this interface, and add a 
static
+  * method with 3 arguments of type String, [[Params]], and SparkContext.
+  *
+  * {{{
+  * public class MyModel implements PersistentModel<MyParams>, Serializable {
+  *   ...
+  *   public boolean save(String id, MyParams params, SparkContext sc) {
+  *     ...
+  *   }
+  *
+  *   public static MyModel load(String id, Params params, SparkContext sc) {
+  *     ...
+  *   }
+  *   ...
+  * }
+  * }}}
+  *
+  * @tparam AP Algorithm parameters class.
+  * @see [[PersistentModelLoader]]
+  * @group Algorithm
+  */
+trait PersistentModel[AP <: Params] {
+  /** Save the model to some persistent storage.
+    *
+    * This method should return true if the model has been saved successfully 
so
+    * that PredictionIO knows that it can be restored later during deployment.
+    * This method should return false if the model cannot be saved (or should
+    * not be saved due to configuration) so that PredictionIO will re-train the
+    * model during deployment. All arguments of this method are provided by
+    * automatically by PredictionIO.
+    *
+    * @param id ID of the run that trained this model.
+    * @param params Algorithm parameters that were used to train this model.
+    * @param sc An Apache Spark context.
+    */
+  def save(id: String, params: AP, sc: SparkContext): Boolean
+}
+
+/** Implement an object that extends this trait for PredictionIO to support
+  * loading a persisted model during serving deployment.
+  *
+  * @tparam AP Algorithm parameters class.
+  * @tparam M Model class.
+  * @see [[PersistentModel]]
+  * @group Algorithm
+  */
+trait PersistentModelLoader[AP <: Params, M] {
+  /** Implement this method to restore a persisted model that extends the
+    * [[PersistentModel]] trait. All arguments of this method are provided
+    * automatically by PredictionIO.
+    *
+    * @param id ID of the run that trained this model.
+    * @param params Algorithm parameters that were used to train this model.
+    * @param sc An optional Apache Spark context. This will be injected if the
+    *           model was generated by a [[PAlgorithm]].
+    */
+  def apply(id: String, params: AP, sc: Option[SparkContext]): M
+}
+
+/** DEPRECATED. Use [[PersistentModel]] instead.
+  *
+  * @group Algorithm */
+@deprecated("Use PersistentModel instead.", "0.9.2")
+trait IPersistentModel[AP <: Params] extends PersistentModel[AP]
+
+/** DEPRECATED. Use [[PersistentModelLoader]] instead.
+  *
+  * @group Algorithm */
+@deprecated("Use PersistentModelLoader instead.", "0.9.2")
+trait IPersistentModelLoader[AP <: Params, M] extends 
PersistentModelLoader[AP, M]

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/SanityCheck.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/SanityCheck.scala 
b/core/src/main/scala/org/apache/predictionio/controller/SanityCheck.scala
new file mode 100644
index 0000000..8449a71
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/SanityCheck.scala
@@ -0,0 +1,30 @@
+/** Copyright 2015 TappingStone, Inc.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.predictionio.controller
+
+/** Extends a data class with this trait if you want PredictionIO to
+  * automatically perform sanity check on your data classes during training.
+  * This is very useful when you need to debug your engine.
+  *
+  * @group Helper
+  */
+trait SanityCheck {
+  /** Implement this method to perform checks on your data. This method should
+    * contain assertions that throw exceptions when your data does not meet
+    * your pre-defined requirement.
+    */
+  def sanityCheck(): Unit
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/controller/Utils.scala 
b/core/src/main/scala/org/apache/predictionio/controller/Utils.scala
new file mode 100644
index 0000000..e74fe4b
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/Utils.scala
@@ -0,0 +1,69 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller
+
+import org.apache.predictionio.workflow.KryoInstantiator
+
+import org.json4s._
+import org.json4s.ext.JodaTimeSerializers
+
+import scala.io.Source
+
+import _root_.java.io.File
+import _root_.java.io.FileOutputStream
+
+/** Controller utilities.
+  *
+  * @group Helper
+  */
+object Utils {
+  /** Default JSON4S serializers for PredictionIO controllers. */
+  val json4sDefaultFormats = DefaultFormats.lossless ++ JodaTimeSerializers.all
+
+  /** Save a model object as a file to a temporary location on local 
filesystem.
+    * It will first try to use the location indicated by the environmental
+    * variable PIO_FS_TMPDIR, then fall back to the java.io.tmpdir property.
+    *
+    * @param id Used as the filename of the file.
+    * @param model Model object.
+    */
+  def save(id: String, model: Any): Unit = {
+    val tmpdir = sys.env.getOrElse("PIO_FS_TMPDIR", 
System.getProperty("java.io.tmpdir"))
+    val modelFile = tmpdir + File.separator + id
+    (new File(tmpdir)).mkdirs
+    val fos = new FileOutputStream(modelFile)
+    val kryo = KryoInstantiator.newKryoInjection
+    fos.write(kryo(model))
+    fos.close
+  }
+
+  /** Load a model object from a file in a temporary location on local
+    * filesystem. It will first try to use the location indicated by the
+    * environmental variable PIO_FS_TMPDIR, then fall back to the 
java.io.tmpdir
+    * property.
+    *
+    * @param id Used as the filename of the file.
+    */
+  def load(id: String): Any = {
+    val tmpdir = sys.env.getOrElse("PIO_FS_TMPDIR", 
System.getProperty("java.io.tmpdir"))
+    val modelFile = tmpdir + File.separator + id
+    val src = Source.fromFile(modelFile)(scala.io.Codec.ISO8859)
+    val kryo = KryoInstantiator.newKryoInjection
+    val m = kryo.invert(src.map(_.toByte).toArray).get
+    src.close
+    m
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/JavaEngineParamsGenerator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/java/JavaEngineParamsGenerator.scala
 
b/core/src/main/scala/org/apache/predictionio/controller/java/JavaEngineParamsGenerator.scala
new file mode 100644
index 0000000..6ab5382
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/predictionio/controller/java/JavaEngineParamsGenerator.scala
@@ -0,0 +1,39 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller.java
+
+import org.apache.predictionio.controller.EngineParams
+import org.apache.predictionio.controller.EngineParamsGenerator
+
+import scala.collection.JavaConversions.asScalaBuffer
+
+/** Define an engine parameter generator in Java
+  *
+  * Implementations of this abstract class can be supplied to "pio eval" as 
the second
+  * command line argument.
+  *
+  * @group Evaluation
+  */
+abstract class JavaEngineParamsGenerator extends EngineParamsGenerator {
+
+  /** Set the list of [[EngineParams]].
+    *
+    * @param engineParams A list of engine params
+    */
+  def setEngineParamsList(engineParams: java.util.List[_ <: EngineParams]) {
+    engineParamsList = asScalaBuffer(engineParams)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/JavaEvaluation.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/java/JavaEvaluation.scala
 
b/core/src/main/scala/org/apache/predictionio/controller/java/JavaEvaluation.scala
new file mode 100644
index 0000000..7c9c984
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/predictionio/controller/java/JavaEvaluation.scala
@@ -0,0 +1,66 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller.java
+
+import org.apache.predictionio.controller.Evaluation
+import org.apache.predictionio.controller.Metric
+import org.apache.predictionio.core.BaseEngine
+
+import scala.collection.JavaConversions.asScalaBuffer
+
+/** Define an evaluation in Java.
+  *
+  * Implementations of this abstract class can be supplied to "pio eval" as 
the first
+  * argument.
+  *
+  * @group Evaluation
+  */
+
+abstract class JavaEvaluation extends Evaluation {
+  /** Set the [[BaseEngine]] and [[Metric]] for this [[Evaluation]]
+    *
+    * @param baseEngine [[BaseEngine]] for this [[JavaEvaluation]]
+    * @param metric [[Metric]] for this [[JavaEvaluation]]
+    * @tparam EI Evaluation information class
+    * @tparam Q Query class
+    * @tparam P Predicted result class
+    * @tparam A Actual result class
+    */
+  def setEngineMetric[EI, Q, P, A](
+    baseEngine: BaseEngine[EI, Q, P, A],
+    metric: Metric[EI, Q, P, A, _]) {
+
+    engineMetric = (baseEngine, metric)
+  }
+
+  /** Set the [[BaseEngine]] and [[Metric]]s for this [[JavaEvaluation]]
+    *
+    * @param baseEngine [[BaseEngine]] for this [[JavaEvaluation]]
+    * @param metric [[Metric]] for this [[JavaEvaluation]]
+    * @param metrics Other [[Metric]]s for this [[JavaEvaluation]]
+    * @tparam EI Evaluation information class
+    * @tparam Q Query class
+    * @tparam P Predicted result class
+    * @tparam A Actual result class
+    */
+  def setEngineMetrics[EI, Q, P, A](
+    baseEngine: BaseEngine[EI, Q, P, A],
+    metric: Metric[EI, Q, P, A, _],
+    metrics: java.util.List[_ <: Metric[EI, Q, P, A, _]]) {
+
+    engineMetrics = (baseEngine, metric, asScalaBuffer(metrics))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/LJavaAlgorithm.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/java/LJavaAlgorithm.scala
 
b/core/src/main/scala/org/apache/predictionio/controller/java/LJavaAlgorithm.scala
new file mode 100644
index 0000000..41cbbff
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/predictionio/controller/java/LJavaAlgorithm.scala
@@ -0,0 +1,31 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller.java
+
+import org.apache.predictionio.controller.LAlgorithm
+
+import scala.reflect.ClassTag
+
+/** Base class of a Java local algorithm. Refer to [[LAlgorithm]] for 
documentation.
+  *
+  * @tparam PD Prepared data class.
+  * @tparam M Trained model class.
+  * @tparam Q Input query class.
+  * @tparam P Output prediction class.
+  * @group Algorithm
+  */
+abstract class LJavaAlgorithm[PD, M, Q, P]
+  extends LAlgorithm[PD, M, Q, P]()(ClassTag.AnyRef.asInstanceOf[ClassTag[M]])

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/LJavaDataSource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/java/LJavaDataSource.scala
 
b/core/src/main/scala/org/apache/predictionio/controller/java/LJavaDataSource.scala
new file mode 100644
index 0000000..aca2ce6
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/predictionio/controller/java/LJavaDataSource.scala
@@ -0,0 +1,31 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller.java
+
+import org.apache.predictionio.controller.LDataSource
+
+import scala.reflect.ClassTag
+
+/** Base class of a Java local data source. Refer to [[LDataSource]] for 
documentation.
+  *
+  * @tparam TD Training data class.
+  * @tparam EI Evaluation Info class.
+  * @tparam Q Input query class.
+  * @tparam A Actual value class.
+  * @group Data Source
+  */
+abstract class LJavaDataSource[TD, EI, Q, A]
+  extends LDataSource[TD, EI, Q, 
A]()(ClassTag.AnyRef.asInstanceOf[ClassTag[TD]])

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/LJavaPreparator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/java/LJavaPreparator.scala
 
b/core/src/main/scala/org/apache/predictionio/controller/java/LJavaPreparator.scala
new file mode 100644
index 0000000..8def08b
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/predictionio/controller/java/LJavaPreparator.scala
@@ -0,0 +1,29 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller.java
+
+import org.apache.predictionio.controller.LPreparator
+
+import scala.reflect.ClassTag
+
+/** Base class of a Java local preparator. Refer to [[LPreparator]] for 
documentation.
+  *
+  * @tparam TD Training data class.
+  * @tparam PD Prepared data class.
+  * @group Preparator
+  */
+abstract class LJavaPreparator[TD, PD]
+  extends LPreparator[TD, PD]()(ClassTag.AnyRef.asInstanceOf[ClassTag[PD]])

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/LJavaServing.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/java/LJavaServing.scala
 
b/core/src/main/scala/org/apache/predictionio/controller/java/LJavaServing.scala
new file mode 100644
index 0000000..ee380c3
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/predictionio/controller/java/LJavaServing.scala
@@ -0,0 +1,26 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller.java
+
+import org.apache.predictionio.controller.LServing
+
+/** Base class of Java local serving. Refer to [[LServing]] for documentation.
+  *
+  * @tparam Q Input query class.
+  * @tparam P Output prediction class.
+  * @group Serving
+  */
+abstract class LJavaServing[Q, P] extends LServing[Q, P]

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/P2LJavaAlgorithm.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/java/P2LJavaAlgorithm.scala
 
b/core/src/main/scala/org/apache/predictionio/controller/java/P2LJavaAlgorithm.scala
new file mode 100644
index 0000000..f41903d
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/predictionio/controller/java/P2LJavaAlgorithm.scala
@@ -0,0 +1,33 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller.java
+
+import org.apache.predictionio.controller.P2LAlgorithm
+
+import scala.reflect.ClassTag
+
+/** Base class of a Java parallel-to-local algorithm. Refer to 
[[P2LAlgorithm]] for documentation.
+  *
+  * @tparam PD Prepared data class.
+  * @tparam M Trained model class.
+  * @tparam Q Input query class.
+  * @tparam P Output prediction class.
+  * @group Algorithm
+  */
+abstract class P2LJavaAlgorithm[PD, M, Q, P]
+  extends P2LAlgorithm[PD, M, Q, P]()(
+    ClassTag.AnyRef.asInstanceOf[ClassTag[M]],
+    ClassTag.AnyRef.asInstanceOf[ClassTag[Q]])

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/PJavaAlgorithm.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/java/PJavaAlgorithm.scala
 
b/core/src/main/scala/org/apache/predictionio/controller/java/PJavaAlgorithm.scala
new file mode 100644
index 0000000..38eaa70
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/predictionio/controller/java/PJavaAlgorithm.scala
@@ -0,0 +1,28 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller.java
+
+import org.apache.predictionio.controller.PAlgorithm
+
+/** Base class of a Java parallel algorithm. Refer to [[PAlgorithm]] for 
documentation.
+  *
+  * @tparam PD Prepared data class.
+  * @tparam M Trained model class.
+  * @tparam Q Input query class.
+  * @tparam P Output prediction class.
+  * @group Algorithm
+  */
+abstract class PJavaAlgorithm[PD, M, Q, P] extends PAlgorithm[PD, M, Q, P]

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/PJavaDataSource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/java/PJavaDataSource.scala
 
b/core/src/main/scala/org/apache/predictionio/controller/java/PJavaDataSource.scala
new file mode 100644
index 0000000..cb04da6
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/predictionio/controller/java/PJavaDataSource.scala
@@ -0,0 +1,28 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller.java
+
+import org.apache.predictionio.controller.PDataSource
+
+/** Base class of a Java parallel data source. Refer to [[PDataSource]] for 
documentation.
+  *
+  * @tparam TD Training data class.
+  * @tparam EI Evaluation Info class.
+  * @tparam Q Input query class.
+  * @tparam A Actual value class.
+  * @group Data Source
+  */
+abstract class PJavaDataSource[TD, EI, Q, A] extends PDataSource[TD, EI, Q, A]

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/PJavaPreparator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/java/PJavaPreparator.scala
 
b/core/src/main/scala/org/apache/predictionio/controller/java/PJavaPreparator.scala
new file mode 100644
index 0000000..5fb953f
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/predictionio/controller/java/PJavaPreparator.scala
@@ -0,0 +1,26 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller.java
+
+import org.apache.predictionio.controller.PPreparator
+
+/** Base class of a Java parallel preparator. Refer to [[PPreparator]] for 
documentation
+  *
+  * @tparam TD Training data class.
+  * @tparam PD Prepared data class.
+  * @group Preparator
+  */
+abstract class PJavaPreparator[TD, PD] extends PPreparator[TD, PD]

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/SerializableComparator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/java/SerializableComparator.scala
 
b/core/src/main/scala/org/apache/predictionio/controller/java/SerializableComparator.scala
new file mode 100644
index 0000000..970cd6c
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/predictionio/controller/java/SerializableComparator.scala
@@ -0,0 +1,20 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.controller.java
+
+import java.util.Comparator
+
+trait SerializableComparator[T] extends Comparator[T] with java.io.Serializable

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/package.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/controller/package.scala 
b/core/src/main/scala/org/apache/predictionio/controller/package.scala
new file mode 100644
index 0000000..35b1e81
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/controller/package.scala
@@ -0,0 +1,168 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio
+
+/** Provides building blocks for writing a complete prediction engine
+  * consisting of DataSource, Preparator, Algorithm, Serving, and Evaluation.
+  *
+  * == Start Building an Engine ==
+  * The starting point of a prediction engine is the [[Engine]] class.
+  *
+  * == The DASE Paradigm ==
+  * The building blocks together form the DASE paradigm. Learn more about DASE
+  * [[http://docs.prediction.io/customize/ here]].
+  *
+  * == Types of Building Blocks ==
+  * Depending on the problem you are solving, you would need to pick 
appropriate
+  * flavors of building blocks.
+  *
+  * === Engines ===
+  * There are 3 typical engine configurations:
+  *
+  *  1. [[PDataSource]], [[PPreparator]], [[P2LAlgorithm]], [[LServing]]
+  *  2. [[PDataSource]], [[PPreparator]], [[PAlgorithm]], [[LServing]]
+  *  3. [[LDataSource]], [[LPreparator]], [[LAlgorithm]], [[LServing]]
+  *
+  * In both configurations 1 and 2, data is sourced and prepared in a
+  * parallelized fashion, with data type as RDD.
+  *
+  * The difference between configurations 1 and 2 come at the algorithm stage.
+  * In configuration 1, the algorithm operates on potentially large data as 
RDDs
+  * in the Spark cluster, and eventually outputs a model that is small enough 
to
+  * fit in a single machine.
+  *
+  * On the other hand, configuration 2 outputs a model that is potentially too
+  * large to fit in a single machine, and must reside in the Spark cluster as
+  * RDD(s).
+  *
+  * With configuration 1 ([[P2LAlgorithm]]), PredictionIO will automatically
+  * try to persist the model to local disk or HDFS if the model is 
serializable.
+  *
+  * With configuration 2 ([[PAlgorithm]]), PredictionIO will not automatically
+  * try to persist the model, unless the model implements the 
[[PersistentModel]]
+  * trait.
+  *
+  * In special circumstances where both the data and the model are small,
+  * configuration 3 may be used. Beware that RDDs cannot be used with
+  * configuration 3.
+  *
+  * === Data Source ===
+  * [[PDataSource]] is probably the most used data source base class with the
+  * ability to process RDD-based data. [[LDataSource]] '''cannot''' handle
+  * RDD-based data. Use only when you have a special requirement.
+  *
+  * === Preparator ===
+  * With [[PDataSource]], you must pick [[PPreparator]]. The same applies to
+  * [[LDataSource]] and [[LPreparator]].
+  *
+  * === Algorithm ===
+  * The workhorse of the engine comes in 3 different flavors.
+  *
+  * ==== P2LAlgorithm ====
+  * Produces a model that is small enough to fit in a single machine from
+  * [[PDataSource]] and [[PPreparator]]. The model '''cannot''' contain any 
RDD.
+  * If the produced model is serializable, PredictionIO will try to
+  * automatically persist it. In addition, P2LAlgorithm.batchPredict is
+  * already implemented for [[Evaluation]] purpose.
+  *
+  * ==== PAlgorithm ====
+  * Produces a model that could contain RDDs from [[PDataSource]] and
+  * [[PPreparator]]. PredictionIO will not try to persist it automatically
+  * unless the model implements [[PersistentModel]]. 
[[PAlgorithm.batchPredict]]
+  * must be implemented for [[Evaluation]].
+  *
+  * ==== LAlgorithm ====
+  * Produces a model that is small enough to fit in a single machine from
+  * [[LDataSource]] and [[LPreparator]]. The model '''cannot''' contain any 
RDD.
+  * If the produced model is serializable, PredictionIO will try to
+  * automatically persist it. In addition, LAlgorithm.batchPredict is
+  * already implemented for [[Evaluation]] purpose.
+  *
+  * === Serving ===
+  * The serving component comes with only 1 flavor--[[LServing]]. At the 
serving
+  * stage, it is assumed that the result being served is already at a human-
+  * consumable size.
+  *
+  * == Model Persistence ==
+  * PredictionIO tries its best to persist trained models automatically. Please
+  * refer to [[LAlgorithm.makePersistentModel]],
+  * [[P2LAlgorithm.makePersistentModel]], and 
[[PAlgorithm.makePersistentModel]]
+  * for descriptions on different strategies.
+  */
+package object controller {
+
+  /** Base class of several helper types that represent emptiness
+    *
+    * @group Helper
+    */
+  class SerializableClass() extends Serializable
+
+  /** Empty data source parameters.
+    * @group Helper
+    */
+  type EmptyDataSourceParams = EmptyParams
+
+  /** Empty data parameters.
+    * @group Helper
+    */
+  type EmptyDataParams = EmptyParams
+
+  /** Empty evaluation info.
+    * @group Helper
+    */
+  type EmptyEvaluationInfo = SerializableClass
+
+  /** Empty preparator parameters.
+    * @group Helper
+    */
+  type EmptyPreparatorParams = EmptyParams
+
+  /** Empty algorithm parameters.
+    * @group Helper
+    */
+  type EmptyAlgorithmParams = EmptyParams
+
+  /** Empty serving parameters.
+    * @group Helper
+    */
+  type EmptyServingParams = EmptyParams
+
+  /** Empty metrics parameters.
+    * @group Helper
+    */
+  type EmptyMetricsParams = EmptyParams
+
+  /** Empty training data.
+    * @group Helper
+    */
+  type EmptyTrainingData = SerializableClass
+
+  /** Empty prepared data.
+    * @group Helper
+    */
+  type EmptyPreparedData = SerializableClass
+
+  /** Empty model.
+    * @group Helper
+    */
+  type EmptyModel = SerializableClass
+
+  /** Empty actual result.
+    * @group Helper
+    */
+  type EmptyActualResult = SerializableClass
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/core/AbstractDoer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/core/AbstractDoer.scala 
b/core/src/main/scala/org/apache/predictionio/core/AbstractDoer.scala
new file mode 100644
index 0000000..5f90d99
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/core/AbstractDoer.scala
@@ -0,0 +1,66 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.core
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.annotation.DeveloperApi
+import org.apache.predictionio.controller.Params
+
+/** :: DeveloperApi ::
+  * Base class for all controllers
+  */
+@DeveloperApi
+abstract class AbstractDoer extends Serializable
+
+/** :: DeveloperApi ::
+  * Provides facility to instantiate controller classes
+  */
+@DeveloperApi
+object Doer extends Logging {
+  /** :: DeveloperApi ::
+    * Instantiates a controller class using supplied controller parameters as
+    * constructor parameters
+    *
+    * @param cls Class of the controller class
+    * @param params Parameters of the controller class
+    * @tparam C Controller class
+    * @return An instance of the controller class
+    */
+  @DeveloperApi
+  def apply[C <: AbstractDoer] (
+    cls: Class[_ <: C], params: Params): C = {
+
+    // Subclasses only allows two kind of constructors.
+    // 1. Constructor with P <: Params.
+    // 2. Emtpy constructor.
+    // First try (1), if failed, try (2).
+    try {
+      val constr = cls.getConstructor(params.getClass)
+      constr.newInstance(params)
+    } catch {
+      case e: NoSuchMethodException => try {
+        val zeroConstr = cls.getConstructor()
+        zeroConstr.newInstance()
+      } catch {
+        case e: NoSuchMethodException =>
+          error(s"${params.getClass.getName} was used as the constructor " +
+            s"argument to ${e.getMessage}, but no constructor can handle it. " 
+
+            "Aborting.")
+          sys.exit(1)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/core/BaseAlgorithm.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/core/BaseAlgorithm.scala 
b/core/src/main/scala/org/apache/predictionio/core/BaseAlgorithm.scala
new file mode 100644
index 0000000..7774861
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/core/BaseAlgorithm.scala
@@ -0,0 +1,123 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.core
+
+import com.google.gson.TypeAdapterFactory
+import org.apache.predictionio.annotation.DeveloperApi
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.controller.Utils
+import net.jodah.typetools.TypeResolver
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+/** :: DeveloperApi ::
+  * Base trait with default custom query serializer, exposed to engine 
developer
+  * via [[org.apache.predictionio.controller.CustomQuerySerializer]]
+  */
+@DeveloperApi
+trait BaseQuerySerializer {
+  /** :: DeveloperApi ::
+    * Serializer for Scala query classes using
+    * [[org.apache.predictionio.controller.Utils.json4sDefaultFormats]]
+    */
+  @DeveloperApi
+  @transient lazy val querySerializer = Utils.json4sDefaultFormats
+
+  /** :: DeveloperApi ::
+    * Serializer for Java query classes using Gson
+    */
+  @DeveloperApi
+  @transient lazy val gsonTypeAdapterFactories = Seq.empty[TypeAdapterFactory]
+}
+
+/** :: DeveloperApi ::
+  * Base class of all algorithm controllers
+  *
+  * @tparam PD Prepared data class
+  * @tparam M Model class
+  * @tparam Q Query class
+  * @tparam P Predicted result class
+  */
+@DeveloperApi
+abstract class BaseAlgorithm[PD, M, Q, P]
+  extends AbstractDoer with BaseQuerySerializer {
+  /** :: DeveloperApi ::
+    * Engine developers should not use this directly. This is called by 
workflow
+    * to train a model.
+    *
+    * @param sc Spark context
+    * @param pd Prepared data
+    * @return Trained model
+    */
+  @DeveloperApi
+  def trainBase(sc: SparkContext, pd: PD): M
+
+  /** :: DeveloperApi ::
+    * Engine developers should not use this directly. This is called by
+    * evaluation workflow to perform batch prediction.
+    *
+    * @param sc Spark context
+    * @param bm Model
+    * @param qs Batch of queries
+    * @return Batch of predicted results
+    */
+  @DeveloperApi
+  def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)])
+  : RDD[(Long, P)]
+
+  /** :: DeveloperApi ::
+    * Engine developers should not use this directly. Called by serving to
+    * perform a single prediction.
+    *
+    * @param bm Model
+    * @param q Query
+    * @return Predicted result
+    */
+  @DeveloperApi
+  def predictBase(bm: Any, q: Q): P
+
+  /** :: DeveloperApi ::
+    * Engine developers should not use this directly. Prepare a model for
+    * persistence in the downstream consumer. PredictionIO supports 3 types of
+    * model persistence: automatic persistence, manual persistence, and
+    * re-training on deployment. This method provides a way for downstream
+    * modules to determine which mode the model should be persisted.
+    *
+    * @param sc Spark context
+    * @param modelId Model ID
+    * @param algoParams Algorithm parameters that trained this model
+    * @param bm Model
+    * @return The model itself for automatic persistence, an instance of
+    *         [[org.apache.predictionio.workflow.PersistentModelManifest]] for 
manual
+    *         persistence, or Unit for re-training on deployment
+    */
+  @DeveloperApi
+  def makePersistentModel(
+    sc: SparkContext,
+    modelId: String,
+    algoParams: Params,
+    bm: Any): Any = Unit
+
+  /** :: DeveloperApi ::
+    * Obtains the type signature of query for this algorithm
+    *
+    * @return Type signature of query
+    */
+  def queryClass: Class[Q] = {
+    val types = TypeResolver.resolveRawArguments(classOf[BaseAlgorithm[PD, M, 
Q, P]], getClass)
+    types(2).asInstanceOf[Class[Q]]
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/core/BaseDataSource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/core/BaseDataSource.scala 
b/core/src/main/scala/org/apache/predictionio/core/BaseDataSource.scala
new file mode 100644
index 0000000..96e2548
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/core/BaseDataSource.scala
@@ -0,0 +1,52 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.core
+
+import org.apache.predictionio.annotation.DeveloperApi
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+/** :: DeveloperApi ::
+  * Base class of all data source controllers
+  *
+  * @tparam TD Training data class
+  * @tparam EI Evaluation information class
+  * @tparam Q Query class
+  * @tparam A Actual result class
+  */
+@DeveloperApi
+abstract class BaseDataSource[TD, EI, Q, A] extends AbstractDoer {
+  /** :: DeveloperApi ::
+    * Engine developer should not use this directly. This is called by workflow
+    * to read training data.
+    *
+    * @param sc Spark context
+    * @return Training data
+    */
+  @DeveloperApi
+  def readTrainingBase(sc: SparkContext): TD
+
+  /** :: DeveloperApi ::
+    * Engine developer should not use this directly. This is called by
+    * evaluation workflow to read training and validation data.
+    *
+    * @param sc Spark context
+    * @return Sets of training data, evaluation information, queries, and 
actual
+    *         results
+    */
+  @DeveloperApi
+  def readEvalBase(sc: SparkContext): Seq[(TD, EI, RDD[(Q, A)])]
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/core/BaseEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/predictionio/core/BaseEngine.scala 
b/core/src/main/scala/org/apache/predictionio/core/BaseEngine.scala
new file mode 100644
index 0000000..6dfce7d
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/core/BaseEngine.scala
@@ -0,0 +1,100 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.core
+
+import org.apache.predictionio.annotation.DeveloperApi
+import org.apache.predictionio.controller.EngineParams
+import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
+import org.apache.predictionio.workflow.WorkflowParams
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.json4s.JValue
+
+/** :: DeveloperApi ::
+  * Base class of all engine controller classes
+  *
+  * @tparam EI Evaluation information class
+  * @tparam Q Query class
+  * @tparam P Predicted result class
+  * @tparam A Actual result class
+  */
+@DeveloperApi
+abstract class BaseEngine[EI, Q, P, A] extends Serializable {
+  /** :: DeveloperApi ::
+    * Implement this method so that training this engine would return a list of
+    * models.
+    *
+    * @param sc An instance of SparkContext.
+    * @param engineParams An instance of [[EngineParams]] for running a single 
training.
+    * @param params An instance of [[WorkflowParams]] that controls the 
workflow.
+    * @return A list of models.
+    */
+  @DeveloperApi
+  def train(
+    sc: SparkContext,
+    engineParams: EngineParams,
+    engineInstanceId: String,
+    params: WorkflowParams): Seq[Any]
+
+  /** :: DeveloperApi ::
+    * Implement this method so that 
[[org.apache.predictionio.controller.Evaluation]] can
+    * use this method to generate inputs for 
[[org.apache.predictionio.controller.Metric]].
+    *
+    * @param sc An instance of SparkContext.
+    * @param engineParams An instance of [[EngineParams]] for running a single 
evaluation.
+    * @param params An instance of [[WorkflowParams]] that controls the 
workflow.
+    * @return A list of evaluation information and RDD of query, predicted
+    *         result, and actual result tuple tuple.
+    */
+  @DeveloperApi
+  def eval(
+    sc: SparkContext,
+    engineParams: EngineParams,
+    params: WorkflowParams): Seq[(EI, RDD[(Q, P, A)])]
+
+  /** :: DeveloperApi ::
+    * Override this method to further optimize the process that runs multiple
+    * evaluations (during tuning, for example). By default, this method calls
+    * [[eval]] for each element in the engine parameters list.
+    *
+    * @param sc An instance of SparkContext.
+    * @param engineParamsList A list of [[EngineParams]] for running batch 
evaluation.
+    * @param params An instance of [[WorkflowParams]] that controls the 
workflow.
+    * @return A list of engine parameters and evaluation result (from 
[[eval]]) tuples.
+    */
+  @DeveloperApi
+  def batchEval(
+    sc: SparkContext,
+    engineParamsList: Seq[EngineParams],
+    params: WorkflowParams)
+  : Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])] = {
+    engineParamsList.map { engineParams =>
+      (engineParams, eval(sc, engineParams, params))
+    }
+  }
+
+  /** :: DeveloperApi ::
+    * Implement this method to convert a JValue (read from an engine variant
+    * JSON file) to an instance of [[EngineParams]].
+    *
+    * @param variantJson Content of the engine variant JSON as JValue.
+    * @param jsonExtractor Content of the engine variant JSON as JValue.
+    * @return An instance of [[EngineParams]] converted from JSON.
+    */
+  @DeveloperApi
+  def jValueToEngineParams(variantJson: JValue, jsonExtractor: 
JsonExtractorOption): EngineParams =
+    throw new NotImplementedError("JSON to EngineParams is not implemented.")
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/core/BaseEvaluator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/predictionio/core/BaseEvaluator.scala 
b/core/src/main/scala/org/apache/predictionio/core/BaseEvaluator.scala
new file mode 100644
index 0000000..71a086d
--- /dev/null
+++ b/core/src/main/scala/org/apache/predictionio/core/BaseEvaluator.scala
@@ -0,0 +1,72 @@
+/** Copyright 2015 TappingStone, Inc.
+  *
+  * Licensed under the Apache License, Version 2.0 (the "License");
+  * you may not use this file except in compliance with the License.
+  * You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package org.apache.predictionio.core
+
+import org.apache.predictionio.annotation.DeveloperApi
+import org.apache.predictionio.annotation.Experimental
+import org.apache.predictionio.controller.EngineParams
+import org.apache.predictionio.controller.Evaluation
+import org.apache.predictionio.workflow.WorkflowParams
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+/** :: DeveloperApi ::
+  * Base class of all evaluator controller classes
+  *
+  * @tparam EI Evaluation information class
+  * @tparam Q Query class
+  * @tparam P Predicted result class
+  * @tparam A Actual result class
+  * @tparam ER Evaluation result class
+  */
+@DeveloperApi
+abstract class BaseEvaluator[EI, Q, P, A, ER <: BaseEvaluatorResult]
+  extends AbstractDoer {
+  /** :: DeveloperApi ::
+    * Engine developers should not use this directly. This is called by
+    * evaluation workflow to perform evaluation.
+    *
+    * @param sc Spark context
+    * @param evaluation Evaluation to run
+    * @param engineEvalDataSet Sets of engine parameters and data for 
evaluation
+    * @param params Evaluation workflow parameters
+    * @return Evaluation result
+    */
+  @DeveloperApi
+  def evaluateBase(
+    sc: SparkContext,
+    evaluation: Evaluation,
+    engineEvalDataSet: Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])],
+    params: WorkflowParams): ER
+}
+
+/** Base trait of evaluator result */
+trait BaseEvaluatorResult extends Serializable {
+  /** A short description of the result */
+  def toOneLiner(): String = ""
+
+  /** HTML portion of the rendered evaluator results */
+  def toHTML(): String = ""
+
+  /** JSON portion of the rendered evaluator results */
+  def toJSON(): String = ""
+
+  /** :: Experimental ::
+    * Indicate if this result is inserted into database
+    */
+  @Experimental
+  val noSave: Boolean = false
+}

Reply via email to