http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/Metric.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/Metric.scala b/core/src/main/scala/org/apache/predictionio/controller/Metric.scala new file mode 100644 index 0000000..cc27984 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/Metric.scala @@ -0,0 +1,266 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import _root_.org.apache.predictionio.controller.java.SerializableComparator +import org.apache.predictionio.core.BaseEngine +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.util.StatCounter + +import scala.Numeric.Implicits._ +import scala.reflect._ + +/** Base class of a [[Metric]]. + * + * @tparam EI Evaluation information + * @tparam Q Query + * @tparam P Predicted result + * @tparam A Actual result + * @tparam R Metric result + * @group Evaluation + */ +abstract class Metric[EI, Q, P, A, R](implicit rOrder: Ordering[R]) +extends Serializable { + /** Java friendly constructor + * + * @param comparator A serializable comparator for sorting the metric results. + * + */ + def this(comparator: SerializableComparator[R]) = { + this()(Ordering.comparatorToOrdering(comparator)) + } + + /** Class name of this [[Metric]]. */ + def header: String = this.getClass.getSimpleName + + /** Calculates the result of this [[Metric]]. */ + def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]): R + + /** Comparison function for R's ordering. */ + def compare(r0: R, r1: R): Int = rOrder.compare(r0, r1) +} + +private [prediction] trait StatsMetricHelper[EI, Q, P, A] { + def calculate(q: Q, p: P, a: A): Double + + def calculateStats(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]) + : StatCounter = { + val doubleRDD = sc.union( + evalDataSet.map { case (_, qpaRDD) => + qpaRDD.map { case (q, p, a) => calculate(q, p, a) } + } + ) + + doubleRDD.stats() + } +} + +private [prediction] trait StatsOptionMetricHelper[EI, Q, P, A] { + def calculate(q: Q, p: P, a: A): Option[Double] + + def calculateStats(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]) + : StatCounter = { + val doubleRDD = sc.union( + evalDataSet.map { case (_, qpaRDD) => + qpaRDD.flatMap { case (q, p, a) => calculate(q, p, a) } + } + ) + + doubleRDD.stats() + } +} + +/** Returns the global average of the score returned by the calculate method. + * + * @tparam EI Evaluation information + * @tparam Q Query + * @tparam P Predicted result + * @tparam A Actual result + * + * @group Evaluation + */ +abstract class AverageMetric[EI, Q, P, A] + extends Metric[EI, Q, P, A, Double] + with StatsMetricHelper[EI, Q, P, A] + with QPAMetric[Q, P, A, Double] { + /** Implement this method to return a score that will be used for averaging + * across all QPA tuples. + */ + def calculate(q: Q, p: P, a: A): Double + + def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]) + : Double = { + calculateStats(sc, evalDataSet).mean + } +} + +/** Returns the global average of the non-None score returned by the calculate + * method. + * + * @tparam EI Evaluation information + * @tparam Q Query + * @tparam P Predicted result + * @tparam A Actual result + * + * @group Evaluation + */ +abstract class OptionAverageMetric[EI, Q, P, A] + extends Metric[EI, Q, P, A, Double] + with StatsOptionMetricHelper[EI, Q, P, A] + with QPAMetric[Q, P, A, Option[Double]] { + /** Implement this method to return a score that will be used for averaging + * across all QPA tuples. + */ + def calculate(q: Q, p: P, a: A): Option[Double] + + def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]) + : Double = { + calculateStats(sc, evalDataSet).mean + } +} + +/** Returns the global standard deviation of the score returned by the calculate method + * + * This method uses org.apache.spark.util.StatCounter library, a one pass + * method is used for calculation + * + * @tparam EI Evaluation information + * @tparam Q Query + * @tparam P Predicted result + * @tparam A Actual result + * + * @group Evaluation + */ +abstract class StdevMetric[EI, Q, P, A] + extends Metric[EI, Q, P, A, Double] + with StatsMetricHelper[EI, Q, P, A] + with QPAMetric[Q, P, A, Double] { + /** Implement this method to return a score that will be used for calculating + * the stdev + * across all QPA tuples. + */ + def calculate(q: Q, p: P, a: A): Double + + def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]) + : Double = { + calculateStats(sc, evalDataSet).stdev + } +} + +/** Returns the global standard deviation of the non-None score returned by the calculate method + * + * This method uses org.apache.spark.util.StatCounter library, a one pass + * method is used for calculation + * + * @tparam EI Evaluation information + * @tparam Q Query + * @tparam P Predicted result + * @tparam A Actual result + * + * @group Evaluation + */ +abstract class OptionStdevMetric[EI, Q, P, A] + extends Metric[EI, Q, P, A, Double] + with StatsOptionMetricHelper[EI, Q, P, A] + with QPAMetric[Q, P, A, Option[Double]] { + /** Implement this method to return a score that will be used for calculating + * the stdev + * across all QPA tuples. + */ + def calculate(q: Q, p: P, a: A): Option[Double] + + def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]) + : Double = { + calculateStats(sc, evalDataSet).stdev + } +} + +/** Returns the sum of the score returned by the calculate method. + * + * @tparam EI Evaluation information + * @tparam Q Query + * @tparam P Predicted result + * @tparam A Actual result + * @tparam R Result, output of the function calculate, must be Numeric + * + * @group Evaluation + */ +abstract class SumMetric[EI, Q, P, A, R: ClassTag](implicit num: Numeric[R]) + extends Metric[EI, Q, P, A, R]()(num) + with QPAMetric[Q, P, A, R] { + /** Implement this method to return a score that will be used for summing + * across all QPA tuples. + */ + def calculate(q: Q, p: P, a: A): R + + def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]) + : R = { + val union: RDD[R] = sc.union( + evalDataSet.map { case (_, qpaRDD) => + qpaRDD.map { case (q, p, a) => calculate(q, p, a) } + } + ) + + union.aggregate[R](num.zero)(_ + _, _ + _) + } +} + +/** Returns zero. Useful as a placeholder during evaluation development when not all components are + * implemented. + * @tparam EI Evaluation information + * @tparam Q Query + * @tparam P Predicted result + * @tparam A Actual result + * + * @group Evaluation + */ +class ZeroMetric[EI, Q, P, A] extends Metric[EI, Q, P, A, Double]() { + def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]): Double = 0.0 +} + +/** Companion object of [[ZeroMetric]] + * + * @group Evaluation + */ +object ZeroMetric { + /** Returns a ZeroMetric instance using Engine's type parameters. */ + def apply[EI, Q, P, A](engine: BaseEngine[EI, Q, P, A]): ZeroMetric[EI, Q, P, A] = { + new ZeroMetric[EI, Q, P, A]() + } +} + + +/** Trait for metric which returns a score based on Query, PredictedResult, + * and ActualResult + * + * @tparam Q Query class + * @tparam P Predicted result class + * @tparam A Actual result class + * @tparam R Metric result class + * @group Evaluation + */ +trait QPAMetric[Q, P, A, R] { + /** Calculate a metric result based on query, predicted result, and actual + * result + * + * @param q Query + * @param p Predicted result + * @param a Actual result + * @return Metric result + */ + def calculate(q: Q, p: P, a: A): R +}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/MetricEvaluator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/MetricEvaluator.scala b/core/src/main/scala/org/apache/predictionio/controller/MetricEvaluator.scala new file mode 100644 index 0000000..b707041 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/MetricEvaluator.scala @@ -0,0 +1,260 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import _root_.java.io.File +import _root_.java.io.PrintWriter + +import com.github.nscala_time.time.Imports.DateTime +import grizzled.slf4j.Logger +import org.apache.predictionio.annotation.DeveloperApi +import org.apache.predictionio.core.BaseEvaluator +import org.apache.predictionio.core.BaseEvaluatorResult +import org.apache.predictionio.data.storage.Storage +import org.apache.predictionio.workflow.JsonExtractor +import org.apache.predictionio.workflow.JsonExtractorOption.Both +import org.apache.predictionio.workflow.NameParamsSerializer +import org.apache.predictionio.workflow.WorkflowParams +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.json4s.native.Serialization.write +import org.json4s.native.Serialization.writePretty + +import scala.language.existentials + +/** Case class storing a primary score, and other scores + * + * @param score Primary metric score + * @param otherScores Other scores this metric might have + * @tparam R Type of the primary metric score + * @group Evaluation + */ +case class MetricScores[R]( + score: R, + otherScores: Seq[Any]) + +/** Contains all results of a [[MetricEvaluator]] + * + * @param bestScore The best score among all iterations + * @param bestEngineParams The set of engine parameters that yielded the best score + * @param bestIdx The index of iteration that yielded the best score + * @param metricHeader Brief description of the primary metric score + * @param otherMetricHeaders Brief descriptions of other metric scores + * @param engineParamsScores All sets of engine parameters and corresponding metric scores + * @param outputPath An optional output path where scores are saved + * @tparam R Type of the primary metric score + * @group Evaluation + */ +case class MetricEvaluatorResult[R]( + bestScore: MetricScores[R], + bestEngineParams: EngineParams, + bestIdx: Int, + metricHeader: String, + otherMetricHeaders: Seq[String], + engineParamsScores: Seq[(EngineParams, MetricScores[R])], + outputPath: Option[String]) +extends BaseEvaluatorResult { + + override def toOneLiner(): String = { + val idx = engineParamsScores.map(_._1).indexOf(bestEngineParams) + s"Best Params Index: $idx Score: ${bestScore.score}" + } + + override def toJSON(): String = { + implicit lazy val formats = Utils.json4sDefaultFormats + + new NameParamsSerializer + write(this) + } + + override def toHTML(): String = html.metric_evaluator().toString() + + override def toString: String = { + implicit lazy val formats = Utils.json4sDefaultFormats + + new NameParamsSerializer + + val bestEPStr = JsonExtractor.engineParamstoPrettyJson(Both, bestEngineParams) + + val strings = Seq( + "MetricEvaluatorResult:", + s" # engine params evaluated: ${engineParamsScores.size}") ++ + Seq( + "Optimal Engine Params:", + s" $bestEPStr", + "Metrics:", + s" $metricHeader: ${bestScore.score}") ++ + otherMetricHeaders.zip(bestScore.otherScores).map { + case (h, s) => s" $h: $s" + } ++ + outputPath.toSeq.map { + p => s"The best variant params can be found in $p" + } + + strings.mkString("\n") + } +} + +/** Companion object of [[MetricEvaluator]] + * + * @group Evaluation + */ +object MetricEvaluator { + def apply[EI, Q, P, A, R]( + metric: Metric[EI, Q, P, A, R], + otherMetrics: Seq[Metric[EI, Q, P, A, _]], + outputPath: String): MetricEvaluator[EI, Q, P, A, R] = { + new MetricEvaluator[EI, Q, P, A, R]( + metric, + otherMetrics, + Some(outputPath)) + } + + def apply[EI, Q, P, A, R]( + metric: Metric[EI, Q, P, A, R], + otherMetrics: Seq[Metric[EI, Q, P, A, _]]) + : MetricEvaluator[EI, Q, P, A, R] = { + new MetricEvaluator[EI, Q, P, A, R]( + metric, + otherMetrics, + None) + } + + def apply[EI, Q, P, A, R](metric: Metric[EI, Q, P, A, R]) + : MetricEvaluator[EI, Q, P, A, R] = { + new MetricEvaluator[EI, Q, P, A, R]( + metric, + Seq[Metric[EI, Q, P, A, _]](), + None) + } + + case class NameParams(name: String, params: Params) { + def this(np: (String, Params)) = this(np._1, np._2) + } + + case class EngineVariant( + id: String, + description: String, + engineFactory: String, + datasource: NameParams, + preparator: NameParams, + algorithms: Seq[NameParams], + serving: NameParams) { + + def this(evaluation: Evaluation, engineParams: EngineParams) = this( + id = "", + description = "", + engineFactory = evaluation.getClass.getName, + datasource = new NameParams(engineParams.dataSourceParams), + preparator = new NameParams(engineParams.preparatorParams), + algorithms = engineParams.algorithmParamsList.map(np => new NameParams(np)), + serving = new NameParams(engineParams.servingParams)) + } +} + +/** :: DeveloperApi :: + * Do no use this directly. Use [[MetricEvaluator$]] instead. This is an + * implementation of [[org.apache.predictionio.core.BaseEvaluator]] that evaluates + * prediction performance based on metric scores. + * + * @param metric Primary metric + * @param otherMetrics Other metrics + * @param outputPath Optional output path to save evaluation results + * @tparam EI Evaluation information type + * @tparam Q Query class + * @tparam P Predicted result class + * @tparam A Actual result class + * @tparam R Metric result class + * @group Evaluation + */ +@DeveloperApi +class MetricEvaluator[EI, Q, P, A, R] ( + val metric: Metric[EI, Q, P, A, R], + val otherMetrics: Seq[Metric[EI, Q, P, A, _]], + val outputPath: Option[String]) + extends BaseEvaluator[EI, Q, P, A, MetricEvaluatorResult[R]] { + @transient lazy val logger = Logger[this.type] + @transient val engineInstances = Storage.getMetaDataEngineInstances() + + def saveEngineJson( + evaluation: Evaluation, + engineParams: EngineParams, + outputPath: String) { + + val now = DateTime.now + val evalClassName = evaluation.getClass.getName + + val variant = MetricEvaluator.EngineVariant( + id = s"$evalClassName $now", + description = "", + engineFactory = evalClassName, + datasource = new MetricEvaluator.NameParams(engineParams.dataSourceParams), + preparator = new MetricEvaluator.NameParams(engineParams.preparatorParams), + algorithms = engineParams.algorithmParamsList.map(np => new MetricEvaluator.NameParams(np)), + serving = new MetricEvaluator.NameParams(engineParams.servingParams)) + + implicit lazy val formats = Utils.json4sDefaultFormats + + logger.info(s"Writing best variant params to disk ($outputPath)...") + val writer = new PrintWriter(new File(outputPath)) + writer.write(writePretty(variant)) + writer.close() + } + + def evaluateBase( + sc: SparkContext, + evaluation: Evaluation, + engineEvalDataSet: Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])], + params: WorkflowParams): MetricEvaluatorResult[R] = { + + val evalResultList: Seq[(EngineParams, MetricScores[R])] = engineEvalDataSet + .zipWithIndex + .par + .map { case ((engineParams, evalDataSet), idx) => + val metricScores = MetricScores[R]( + metric.calculate(sc, evalDataSet), + otherMetrics.map(_.calculate(sc, evalDataSet))) + (engineParams, metricScores) + } + .seq + + implicit lazy val formats = Utils.json4sDefaultFormats + + new NameParamsSerializer + + evalResultList.zipWithIndex.foreach { case ((ep, r), idx) => + logger.info(s"Iteration $idx") + logger.info(s"EngineParams: ${JsonExtractor.engineParamsToJson(Both, ep)}") + logger.info(s"Result: $r") + } + + // use max. take implicit from Metric. + val ((bestEngineParams, bestScore), bestIdx) = evalResultList + .zipWithIndex + .reduce { (x, y) => + if (metric.compare(x._1._2.score, y._1._2.score) >= 0) x else y + } + + // save engine params if it is set. + outputPath.foreach { path => saveEngineJson(evaluation, bestEngineParams, path) } + + MetricEvaluatorResult( + bestScore = bestScore, + bestEngineParams = bestEngineParams, + bestIdx = bestIdx, + metricHeader = metric.header, + otherMetricHeaders = otherMetrics.map(_.header), + engineParamsScores = evalResultList, + outputPath = outputPath) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala new file mode 100644 index 0000000..cb9f7c4 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala @@ -0,0 +1,121 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import _root_.org.apache.predictionio.annotation.DeveloperApi +import org.apache.predictionio.core.BaseAlgorithm +import org.apache.predictionio.workflow.PersistentModelManifest +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +import scala.reflect._ + +/** Base class of a parallel-to-local algorithm. + * + * A parallel-to-local algorithm can be run in parallel on a cluster and + * produces a model that can fit within a single machine. + * + * If your input query class requires custom JSON4S serialization, the most + * idiomatic way is to implement a trait that extends [[CustomQuerySerializer]], + * and mix that into your algorithm class, instead of overriding + * [[querySerializer]] directly. + * + * @tparam PD Prepared data class. + * @tparam M Trained model class. + * @tparam Q Input query class. + * @tparam P Output prediction class. + * @group Algorithm + */ +abstract class P2LAlgorithm[PD, M: ClassTag, Q: ClassTag, P] + extends BaseAlgorithm[PD, M, Q, P] { + + def trainBase(sc: SparkContext, pd: PD): M = train(sc, pd) + + /** Implement this method to produce a model from prepared data. + * + * @param pd Prepared data for model training. + * @return Trained model. + */ + def train(sc: SparkContext, pd: PD): M + + def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)]) + : RDD[(Long, P)] = batchPredict(bm.asInstanceOf[M], qs) + + /** This is a default implementation to perform batch prediction. Override + * this method for a custom implementation. + * + * @param m A model + * @param qs An RDD of index-query tuples. The index is used to keep track of + * predicted results with corresponding queries. + * @return Batch of predicted results + */ + def batchPredict(m: M, qs: RDD[(Long, Q)]): RDD[(Long, P)] = { + qs.mapValues { q => predict(m, q) } + } + + def predictBase(bm: Any, q: Q): P = predict(bm.asInstanceOf[M], q) + + /** Implement this method to produce a prediction from a query and trained + * model. + * + * @param model Trained model produced by [[train]]. + * @param query An input query. + * @return A prediction. + */ + def predict(model: M, query: Q): P + + /** :: DeveloperApi :: + * Engine developers should not use this directly (read on to see how + * parallel-to-local algorithm models are persisted). + * + * Parallel-to-local algorithms produce local models. By default, models will be + * serialized and stored automatically. Engine developers can override this behavior by + * mixing the [[PersistentModel]] trait into the model class, and + * PredictionIO will call [[PersistentModel.save]] instead. If it returns + * true, a [[org.apache.predictionio.workflow.PersistentModelManifest]] will be + * returned so that during deployment, PredictionIO will use + * [[PersistentModelLoader]] to retrieve the model. Otherwise, Unit will be + * returned and the model will be re-trained on-the-fly. + * + * @param sc Spark context + * @param modelId Model ID + * @param algoParams Algorithm parameters that trained this model + * @param bm Model + * @return The model itself for automatic persistence, an instance of + * [[org.apache.predictionio.workflow.PersistentModelManifest]] for manual + * persistence, or Unit for re-training on deployment + */ + @DeveloperApi + override + def makePersistentModel( + sc: SparkContext, + modelId: String, + algoParams: Params, + bm: Any): Any = { + val m = bm.asInstanceOf[M] + if (m.isInstanceOf[PersistentModel[_]]) { + if (m.asInstanceOf[PersistentModel[Params]].save( + modelId, algoParams, sc)) { + PersistentModelManifest(className = m.getClass.getName) + } else { + Unit + } + } else { + m + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala new file mode 100644 index 0000000..d2123f3 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala @@ -0,0 +1,126 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import org.apache.predictionio.annotation.DeveloperApi +import org.apache.predictionio.core.BaseAlgorithm +import org.apache.predictionio.workflow.PersistentModelManifest +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD + +/** Base class of a parallel algorithm. + * + * A parallel algorithm can be run in parallel on a cluster and produces a + * model that can also be distributed across a cluster. + * + * If your input query class requires custom JSON4S serialization, the most + * idiomatic way is to implement a trait that extends [[CustomQuerySerializer]], + * and mix that into your algorithm class, instead of overriding + * [[querySerializer]] directly. + * + * To provide evaluation feature, one must override and implement the + * [[batchPredict]] method. Otherwise, an exception will be thrown when pio eval` + * is used. + * + * @tparam PD Prepared data class. + * @tparam M Trained model class. + * @tparam Q Input query class. + * @tparam P Output prediction class. + * @group Algorithm + */ +abstract class PAlgorithm[PD, M, Q, P] + extends BaseAlgorithm[PD, M, Q, P] { + + def trainBase(sc: SparkContext, pd: PD): M = train(sc, pd) + + /** Implement this method to produce a model from prepared data. + * + * @param pd Prepared data for model training. + * @return Trained model. + */ + def train(sc: SparkContext, pd: PD): M + + def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)]) + : RDD[(Long, P)] = batchPredict(bm.asInstanceOf[M], qs) + + /** To provide evaluation feature, one must override and implement this method + * to generate many predictions in batch. Otherwise, an exception will be + * thrown when `pio eval` is used. + * + * The default implementation throws an exception. + * + * @param m Trained model produced by [[train]]. + * @param qs An RDD of index-query tuples. The index is used to keep track of + * predicted results with corresponding queries. + */ + def batchPredict(m: M, qs: RDD[(Long, Q)]): RDD[(Long, P)] = + throw new NotImplementedError("batchPredict not implemented") + + def predictBase(baseModel: Any, query: Q): P = { + predict(baseModel.asInstanceOf[M], query) + } + + /** Implement this method to produce a prediction from a query and trained + * model. + * + * @param model Trained model produced by [[train]]. + * @param query An input query. + * @return A prediction. + */ + def predict(model: M, query: Q): P + + /** :: DeveloperApi :: + * Engine developers should not use this directly (read on to see how parallel + * algorithm models are persisted). + * + * In general, parallel models may contain multiple RDDs. It is not easy to + * infer and persist them programmatically since these RDDs may be + * potentially huge. To persist these models, engine developers need to mix + * the [[PersistentModel]] trait into the model class and implement + * [[PersistentModel.save]]. If it returns true, a + * [[org.apache.predictionio.workflow.PersistentModelManifest]] will be + * returned so that during deployment, PredictionIO will use + * [[PersistentModelLoader]] to retrieve the model. Otherwise, Unit will be + * returned and the model will be re-trained on-the-fly. + * + * @param sc Spark context + * @param modelId Model ID + * @param algoParams Algorithm parameters that trained this model + * @param bm Model + * @return The model itself for automatic persistence, an instance of + * [[org.apache.predictionio.workflow.PersistentModelManifest]] for manual + * persistence, or Unit for re-training on deployment + */ + @DeveloperApi + override + def makePersistentModel( + sc: SparkContext, + modelId: String, + algoParams: Params, + bm: Any): Any = { + val m = bm.asInstanceOf[M] + if (m.isInstanceOf[PersistentModel[_]]) { + if (m.asInstanceOf[PersistentModel[Params]].save( + modelId, algoParams, sc)) { + PersistentModelManifest(className = m.getClass.getName) + } else { + Unit + } + } else { + Unit + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/PDataSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/PDataSource.scala b/core/src/main/scala/org/apache/predictionio/controller/PDataSource.scala new file mode 100644 index 0000000..e595dca --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/PDataSource.scala @@ -0,0 +1,57 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import org.apache.predictionio.core.BaseDataSource +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD + +/** Base class of a parallel data source. + * + * A parallel data source runs locally within a single machine, or in parallel + * on a cluster, to return data that is distributed across a cluster. + * + * @tparam TD Training data class. + * @tparam EI Evaluation Info class. + * @tparam Q Input query class. + * @tparam A Actual value class. + * @group Data Source + */ + +abstract class PDataSource[TD, EI, Q, A] + extends BaseDataSource[TD, EI, Q, A] { + + def readTrainingBase(sc: SparkContext): TD = readTraining(sc) + + /** Implement this method to only return training data from a data source */ + def readTraining(sc: SparkContext): TD + + def readEvalBase(sc: SparkContext): Seq[(TD, EI, RDD[(Q, A)])] = readEval(sc) + + /** To provide evaluation feature for your engine, your must override this + * method to return data for evaluation from a data source. Returned data can + * optionally include a sequence of query and actual value pairs for + * evaluation purpose. + * + * The default implementation returns an empty sequence as a stub, so that + * an engine can be compiled without implementing evaluation. + */ + def readEval(sc: SparkContext): Seq[(TD, EI, RDD[(Q, A)])] = + Seq[(TD, EI, RDD[(Q, A)])]() + + @deprecated("Use readEval() instead.", "0.9.0") + def read(sc: SparkContext): Seq[(TD, EI, RDD[(Q, A)])] = readEval(sc) +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/PPreparator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/PPreparator.scala b/core/src/main/scala/org/apache/predictionio/controller/PPreparator.scala new file mode 100644 index 0000000..66f51e4 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/PPreparator.scala @@ -0,0 +1,44 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import org.apache.predictionio.core.BasePreparator +import org.apache.spark.SparkContext + +/** Base class of a parallel preparator. + * + * A parallel preparator can be run in parallel on a cluster and produces a + * prepared data that is distributed across a cluster. + * + * @tparam TD Training data class. + * @tparam PD Prepared data class. + * @group Preparator + */ +abstract class PPreparator[TD, PD] + extends BasePreparator[TD, PD] { + + def prepareBase(sc: SparkContext, td: TD): PD = { + prepare(sc, td) + } + + /** Implement this method to produce prepared data that is ready for model + * training. + * + * @param sc An Apache Spark context. + * @param trainingData Training data to be prepared. + */ + def prepare(sc: SparkContext, trainingData: TD): PD +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/Params.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/Params.scala b/core/src/main/scala/org/apache/predictionio/controller/Params.scala new file mode 100644 index 0000000..bdb3f7e --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/Params.scala @@ -0,0 +1,31 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +/** Base trait for all kinds of parameters that will be passed to constructors + * of different controller classes. + * + * @group Helper + */ +trait Params extends Serializable {} + +/** A concrete implementation of [[Params]] representing empty parameters. + * + * @group Helper + */ +case class EmptyParams() extends Params { + override def toString(): String = "Empty" +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/PersistentModel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/PersistentModel.scala b/core/src/main/scala/org/apache/predictionio/controller/PersistentModel.scala new file mode 100644 index 0000000..fb8d57b --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/PersistentModel.scala @@ -0,0 +1,112 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import org.apache.spark.SparkContext + +/** Mix in and implement this trait if your model cannot be persisted by + * PredictionIO automatically. A companion object extending + * IPersistentModelLoader is required for PredictionIO to load the persisted + * model automatically during deployment. + * + * Notice that models generated by [[PAlgorithm]] cannot be persisted + * automatically by nature and must implement these traits if model persistence + * is desired. + * + * {{{ + * class MyModel extends PersistentModel[MyParams] { + * def save(id: String, params: MyParams, sc: SparkContext): Boolean = { + * ... + * } + * } + * + * object MyModel extends PersistentModelLoader[MyParams, MyModel] { + * def apply(id: String, params: MyParams, sc: Option[SparkContext]): MyModel = { + * ... + * } + * } + * }}} + * + * In Java, all you need to do is to implement this interface, and add a static + * method with 3 arguments of type String, [[Params]], and SparkContext. + * + * {{{ + * public class MyModel implements PersistentModel<MyParams>, Serializable { + * ... + * public boolean save(String id, MyParams params, SparkContext sc) { + * ... + * } + * + * public static MyModel load(String id, Params params, SparkContext sc) { + * ... + * } + * ... + * } + * }}} + * + * @tparam AP Algorithm parameters class. + * @see [[PersistentModelLoader]] + * @group Algorithm + */ +trait PersistentModel[AP <: Params] { + /** Save the model to some persistent storage. + * + * This method should return true if the model has been saved successfully so + * that PredictionIO knows that it can be restored later during deployment. + * This method should return false if the model cannot be saved (or should + * not be saved due to configuration) so that PredictionIO will re-train the + * model during deployment. All arguments of this method are provided by + * automatically by PredictionIO. + * + * @param id ID of the run that trained this model. + * @param params Algorithm parameters that were used to train this model. + * @param sc An Apache Spark context. + */ + def save(id: String, params: AP, sc: SparkContext): Boolean +} + +/** Implement an object that extends this trait for PredictionIO to support + * loading a persisted model during serving deployment. + * + * @tparam AP Algorithm parameters class. + * @tparam M Model class. + * @see [[PersistentModel]] + * @group Algorithm + */ +trait PersistentModelLoader[AP <: Params, M] { + /** Implement this method to restore a persisted model that extends the + * [[PersistentModel]] trait. All arguments of this method are provided + * automatically by PredictionIO. + * + * @param id ID of the run that trained this model. + * @param params Algorithm parameters that were used to train this model. + * @param sc An optional Apache Spark context. This will be injected if the + * model was generated by a [[PAlgorithm]]. + */ + def apply(id: String, params: AP, sc: Option[SparkContext]): M +} + +/** DEPRECATED. Use [[PersistentModel]] instead. + * + * @group Algorithm */ +@deprecated("Use PersistentModel instead.", "0.9.2") +trait IPersistentModel[AP <: Params] extends PersistentModel[AP] + +/** DEPRECATED. Use [[PersistentModelLoader]] instead. + * + * @group Algorithm */ +@deprecated("Use PersistentModelLoader instead.", "0.9.2") +trait IPersistentModelLoader[AP <: Params, M] extends PersistentModelLoader[AP, M] http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/SanityCheck.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/SanityCheck.scala b/core/src/main/scala/org/apache/predictionio/controller/SanityCheck.scala new file mode 100644 index 0000000..8449a71 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/SanityCheck.scala @@ -0,0 +1,30 @@ +/** Copyright 2015 TappingStone, Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.predictionio.controller + +/** Extends a data class with this trait if you want PredictionIO to + * automatically perform sanity check on your data classes during training. + * This is very useful when you need to debug your engine. + * + * @group Helper + */ +trait SanityCheck { + /** Implement this method to perform checks on your data. This method should + * contain assertions that throw exceptions when your data does not meet + * your pre-defined requirement. + */ + def sanityCheck(): Unit +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/Utils.scala b/core/src/main/scala/org/apache/predictionio/controller/Utils.scala new file mode 100644 index 0000000..e74fe4b --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/Utils.scala @@ -0,0 +1,69 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import org.apache.predictionio.workflow.KryoInstantiator + +import org.json4s._ +import org.json4s.ext.JodaTimeSerializers + +import scala.io.Source + +import _root_.java.io.File +import _root_.java.io.FileOutputStream + +/** Controller utilities. + * + * @group Helper + */ +object Utils { + /** Default JSON4S serializers for PredictionIO controllers. */ + val json4sDefaultFormats = DefaultFormats.lossless ++ JodaTimeSerializers.all + + /** Save a model object as a file to a temporary location on local filesystem. + * It will first try to use the location indicated by the environmental + * variable PIO_FS_TMPDIR, then fall back to the java.io.tmpdir property. + * + * @param id Used as the filename of the file. + * @param model Model object. + */ + def save(id: String, model: Any): Unit = { + val tmpdir = sys.env.getOrElse("PIO_FS_TMPDIR", System.getProperty("java.io.tmpdir")) + val modelFile = tmpdir + File.separator + id + (new File(tmpdir)).mkdirs + val fos = new FileOutputStream(modelFile) + val kryo = KryoInstantiator.newKryoInjection + fos.write(kryo(model)) + fos.close + } + + /** Load a model object from a file in a temporary location on local + * filesystem. It will first try to use the location indicated by the + * environmental variable PIO_FS_TMPDIR, then fall back to the java.io.tmpdir + * property. + * + * @param id Used as the filename of the file. + */ + def load(id: String): Any = { + val tmpdir = sys.env.getOrElse("PIO_FS_TMPDIR", System.getProperty("java.io.tmpdir")) + val modelFile = tmpdir + File.separator + id + val src = Source.fromFile(modelFile)(scala.io.Codec.ISO8859) + val kryo = KryoInstantiator.newKryoInjection + val m = kryo.invert(src.map(_.toByte).toArray).get + src.close + m + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/JavaEngineParamsGenerator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/java/JavaEngineParamsGenerator.scala b/core/src/main/scala/org/apache/predictionio/controller/java/JavaEngineParamsGenerator.scala new file mode 100644 index 0000000..6ab5382 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/java/JavaEngineParamsGenerator.scala @@ -0,0 +1,39 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller.java + +import org.apache.predictionio.controller.EngineParams +import org.apache.predictionio.controller.EngineParamsGenerator + +import scala.collection.JavaConversions.asScalaBuffer + +/** Define an engine parameter generator in Java + * + * Implementations of this abstract class can be supplied to "pio eval" as the second + * command line argument. + * + * @group Evaluation + */ +abstract class JavaEngineParamsGenerator extends EngineParamsGenerator { + + /** Set the list of [[EngineParams]]. + * + * @param engineParams A list of engine params + */ + def setEngineParamsList(engineParams: java.util.List[_ <: EngineParams]) { + engineParamsList = asScalaBuffer(engineParams) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/JavaEvaluation.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/java/JavaEvaluation.scala b/core/src/main/scala/org/apache/predictionio/controller/java/JavaEvaluation.scala new file mode 100644 index 0000000..7c9c984 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/java/JavaEvaluation.scala @@ -0,0 +1,66 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller.java + +import org.apache.predictionio.controller.Evaluation +import org.apache.predictionio.controller.Metric +import org.apache.predictionio.core.BaseEngine + +import scala.collection.JavaConversions.asScalaBuffer + +/** Define an evaluation in Java. + * + * Implementations of this abstract class can be supplied to "pio eval" as the first + * argument. + * + * @group Evaluation + */ + +abstract class JavaEvaluation extends Evaluation { + /** Set the [[BaseEngine]] and [[Metric]] for this [[Evaluation]] + * + * @param baseEngine [[BaseEngine]] for this [[JavaEvaluation]] + * @param metric [[Metric]] for this [[JavaEvaluation]] + * @tparam EI Evaluation information class + * @tparam Q Query class + * @tparam P Predicted result class + * @tparam A Actual result class + */ + def setEngineMetric[EI, Q, P, A]( + baseEngine: BaseEngine[EI, Q, P, A], + metric: Metric[EI, Q, P, A, _]) { + + engineMetric = (baseEngine, metric) + } + + /** Set the [[BaseEngine]] and [[Metric]]s for this [[JavaEvaluation]] + * + * @param baseEngine [[BaseEngine]] for this [[JavaEvaluation]] + * @param metric [[Metric]] for this [[JavaEvaluation]] + * @param metrics Other [[Metric]]s for this [[JavaEvaluation]] + * @tparam EI Evaluation information class + * @tparam Q Query class + * @tparam P Predicted result class + * @tparam A Actual result class + */ + def setEngineMetrics[EI, Q, P, A]( + baseEngine: BaseEngine[EI, Q, P, A], + metric: Metric[EI, Q, P, A, _], + metrics: java.util.List[_ <: Metric[EI, Q, P, A, _]]) { + + engineMetrics = (baseEngine, metric, asScalaBuffer(metrics)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/LJavaAlgorithm.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/java/LJavaAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/controller/java/LJavaAlgorithm.scala new file mode 100644 index 0000000..41cbbff --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/java/LJavaAlgorithm.scala @@ -0,0 +1,31 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller.java + +import org.apache.predictionio.controller.LAlgorithm + +import scala.reflect.ClassTag + +/** Base class of a Java local algorithm. Refer to [[LAlgorithm]] for documentation. + * + * @tparam PD Prepared data class. + * @tparam M Trained model class. + * @tparam Q Input query class. + * @tparam P Output prediction class. + * @group Algorithm + */ +abstract class LJavaAlgorithm[PD, M, Q, P] + extends LAlgorithm[PD, M, Q, P]()(ClassTag.AnyRef.asInstanceOf[ClassTag[M]]) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/LJavaDataSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/java/LJavaDataSource.scala b/core/src/main/scala/org/apache/predictionio/controller/java/LJavaDataSource.scala new file mode 100644 index 0000000..aca2ce6 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/java/LJavaDataSource.scala @@ -0,0 +1,31 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller.java + +import org.apache.predictionio.controller.LDataSource + +import scala.reflect.ClassTag + +/** Base class of a Java local data source. Refer to [[LDataSource]] for documentation. + * + * @tparam TD Training data class. + * @tparam EI Evaluation Info class. + * @tparam Q Input query class. + * @tparam A Actual value class. + * @group Data Source + */ +abstract class LJavaDataSource[TD, EI, Q, A] + extends LDataSource[TD, EI, Q, A]()(ClassTag.AnyRef.asInstanceOf[ClassTag[TD]]) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/LJavaPreparator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/java/LJavaPreparator.scala b/core/src/main/scala/org/apache/predictionio/controller/java/LJavaPreparator.scala new file mode 100644 index 0000000..8def08b --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/java/LJavaPreparator.scala @@ -0,0 +1,29 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller.java + +import org.apache.predictionio.controller.LPreparator + +import scala.reflect.ClassTag + +/** Base class of a Java local preparator. Refer to [[LPreparator]] for documentation. + * + * @tparam TD Training data class. + * @tparam PD Prepared data class. + * @group Preparator + */ +abstract class LJavaPreparator[TD, PD] + extends LPreparator[TD, PD]()(ClassTag.AnyRef.asInstanceOf[ClassTag[PD]]) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/LJavaServing.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/java/LJavaServing.scala b/core/src/main/scala/org/apache/predictionio/controller/java/LJavaServing.scala new file mode 100644 index 0000000..ee380c3 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/java/LJavaServing.scala @@ -0,0 +1,26 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller.java + +import org.apache.predictionio.controller.LServing + +/** Base class of Java local serving. Refer to [[LServing]] for documentation. + * + * @tparam Q Input query class. + * @tparam P Output prediction class. + * @group Serving + */ +abstract class LJavaServing[Q, P] extends LServing[Q, P] http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/P2LJavaAlgorithm.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/java/P2LJavaAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/controller/java/P2LJavaAlgorithm.scala new file mode 100644 index 0000000..f41903d --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/java/P2LJavaAlgorithm.scala @@ -0,0 +1,33 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller.java + +import org.apache.predictionio.controller.P2LAlgorithm + +import scala.reflect.ClassTag + +/** Base class of a Java parallel-to-local algorithm. Refer to [[P2LAlgorithm]] for documentation. + * + * @tparam PD Prepared data class. + * @tparam M Trained model class. + * @tparam Q Input query class. + * @tparam P Output prediction class. + * @group Algorithm + */ +abstract class P2LJavaAlgorithm[PD, M, Q, P] + extends P2LAlgorithm[PD, M, Q, P]()( + ClassTag.AnyRef.asInstanceOf[ClassTag[M]], + ClassTag.AnyRef.asInstanceOf[ClassTag[Q]]) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/PJavaAlgorithm.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/java/PJavaAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/controller/java/PJavaAlgorithm.scala new file mode 100644 index 0000000..38eaa70 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/java/PJavaAlgorithm.scala @@ -0,0 +1,28 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller.java + +import org.apache.predictionio.controller.PAlgorithm + +/** Base class of a Java parallel algorithm. Refer to [[PAlgorithm]] for documentation. + * + * @tparam PD Prepared data class. + * @tparam M Trained model class. + * @tparam Q Input query class. + * @tparam P Output prediction class. + * @group Algorithm + */ +abstract class PJavaAlgorithm[PD, M, Q, P] extends PAlgorithm[PD, M, Q, P] http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/PJavaDataSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/java/PJavaDataSource.scala b/core/src/main/scala/org/apache/predictionio/controller/java/PJavaDataSource.scala new file mode 100644 index 0000000..cb04da6 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/java/PJavaDataSource.scala @@ -0,0 +1,28 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller.java + +import org.apache.predictionio.controller.PDataSource + +/** Base class of a Java parallel data source. Refer to [[PDataSource]] for documentation. + * + * @tparam TD Training data class. + * @tparam EI Evaluation Info class. + * @tparam Q Input query class. + * @tparam A Actual value class. + * @group Data Source + */ +abstract class PJavaDataSource[TD, EI, Q, A] extends PDataSource[TD, EI, Q, A] http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/PJavaPreparator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/java/PJavaPreparator.scala b/core/src/main/scala/org/apache/predictionio/controller/java/PJavaPreparator.scala new file mode 100644 index 0000000..5fb953f --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/java/PJavaPreparator.scala @@ -0,0 +1,26 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller.java + +import org.apache.predictionio.controller.PPreparator + +/** Base class of a Java parallel preparator. Refer to [[PPreparator]] for documentation + * + * @tparam TD Training data class. + * @tparam PD Prepared data class. + * @group Preparator + */ +abstract class PJavaPreparator[TD, PD] extends PPreparator[TD, PD] http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/java/SerializableComparator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/java/SerializableComparator.scala b/core/src/main/scala/org/apache/predictionio/controller/java/SerializableComparator.scala new file mode 100644 index 0000000..970cd6c --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/java/SerializableComparator.scala @@ -0,0 +1,20 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller.java + +import java.util.Comparator + +trait SerializableComparator[T] extends Comparator[T] with java.io.Serializable http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/package.scala b/core/src/main/scala/org/apache/predictionio/controller/package.scala new file mode 100644 index 0000000..35b1e81 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/package.scala @@ -0,0 +1,168 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio + +/** Provides building blocks for writing a complete prediction engine + * consisting of DataSource, Preparator, Algorithm, Serving, and Evaluation. + * + * == Start Building an Engine == + * The starting point of a prediction engine is the [[Engine]] class. + * + * == The DASE Paradigm == + * The building blocks together form the DASE paradigm. Learn more about DASE + * [[http://docs.prediction.io/customize/ here]]. + * + * == Types of Building Blocks == + * Depending on the problem you are solving, you would need to pick appropriate + * flavors of building blocks. + * + * === Engines === + * There are 3 typical engine configurations: + * + * 1. [[PDataSource]], [[PPreparator]], [[P2LAlgorithm]], [[LServing]] + * 2. [[PDataSource]], [[PPreparator]], [[PAlgorithm]], [[LServing]] + * 3. [[LDataSource]], [[LPreparator]], [[LAlgorithm]], [[LServing]] + * + * In both configurations 1 and 2, data is sourced and prepared in a + * parallelized fashion, with data type as RDD. + * + * The difference between configurations 1 and 2 come at the algorithm stage. + * In configuration 1, the algorithm operates on potentially large data as RDDs + * in the Spark cluster, and eventually outputs a model that is small enough to + * fit in a single machine. + * + * On the other hand, configuration 2 outputs a model that is potentially too + * large to fit in a single machine, and must reside in the Spark cluster as + * RDD(s). + * + * With configuration 1 ([[P2LAlgorithm]]), PredictionIO will automatically + * try to persist the model to local disk or HDFS if the model is serializable. + * + * With configuration 2 ([[PAlgorithm]]), PredictionIO will not automatically + * try to persist the model, unless the model implements the [[PersistentModel]] + * trait. + * + * In special circumstances where both the data and the model are small, + * configuration 3 may be used. Beware that RDDs cannot be used with + * configuration 3. + * + * === Data Source === + * [[PDataSource]] is probably the most used data source base class with the + * ability to process RDD-based data. [[LDataSource]] '''cannot''' handle + * RDD-based data. Use only when you have a special requirement. + * + * === Preparator === + * With [[PDataSource]], you must pick [[PPreparator]]. The same applies to + * [[LDataSource]] and [[LPreparator]]. + * + * === Algorithm === + * The workhorse of the engine comes in 3 different flavors. + * + * ==== P2LAlgorithm ==== + * Produces a model that is small enough to fit in a single machine from + * [[PDataSource]] and [[PPreparator]]. The model '''cannot''' contain any RDD. + * If the produced model is serializable, PredictionIO will try to + * automatically persist it. In addition, P2LAlgorithm.batchPredict is + * already implemented for [[Evaluation]] purpose. + * + * ==== PAlgorithm ==== + * Produces a model that could contain RDDs from [[PDataSource]] and + * [[PPreparator]]. PredictionIO will not try to persist it automatically + * unless the model implements [[PersistentModel]]. [[PAlgorithm.batchPredict]] + * must be implemented for [[Evaluation]]. + * + * ==== LAlgorithm ==== + * Produces a model that is small enough to fit in a single machine from + * [[LDataSource]] and [[LPreparator]]. The model '''cannot''' contain any RDD. + * If the produced model is serializable, PredictionIO will try to + * automatically persist it. In addition, LAlgorithm.batchPredict is + * already implemented for [[Evaluation]] purpose. + * + * === Serving === + * The serving component comes with only 1 flavor--[[LServing]]. At the serving + * stage, it is assumed that the result being served is already at a human- + * consumable size. + * + * == Model Persistence == + * PredictionIO tries its best to persist trained models automatically. Please + * refer to [[LAlgorithm.makePersistentModel]], + * [[P2LAlgorithm.makePersistentModel]], and [[PAlgorithm.makePersistentModel]] + * for descriptions on different strategies. + */ +package object controller { + + /** Base class of several helper types that represent emptiness + * + * @group Helper + */ + class SerializableClass() extends Serializable + + /** Empty data source parameters. + * @group Helper + */ + type EmptyDataSourceParams = EmptyParams + + /** Empty data parameters. + * @group Helper + */ + type EmptyDataParams = EmptyParams + + /** Empty evaluation info. + * @group Helper + */ + type EmptyEvaluationInfo = SerializableClass + + /** Empty preparator parameters. + * @group Helper + */ + type EmptyPreparatorParams = EmptyParams + + /** Empty algorithm parameters. + * @group Helper + */ + type EmptyAlgorithmParams = EmptyParams + + /** Empty serving parameters. + * @group Helper + */ + type EmptyServingParams = EmptyParams + + /** Empty metrics parameters. + * @group Helper + */ + type EmptyMetricsParams = EmptyParams + + /** Empty training data. + * @group Helper + */ + type EmptyTrainingData = SerializableClass + + /** Empty prepared data. + * @group Helper + */ + type EmptyPreparedData = SerializableClass + + /** Empty model. + * @group Helper + */ + type EmptyModel = SerializableClass + + /** Empty actual result. + * @group Helper + */ + type EmptyActualResult = SerializableClass + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/core/AbstractDoer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/core/AbstractDoer.scala b/core/src/main/scala/org/apache/predictionio/core/AbstractDoer.scala new file mode 100644 index 0000000..5f90d99 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/core/AbstractDoer.scala @@ -0,0 +1,66 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.core + +import grizzled.slf4j.Logging +import org.apache.predictionio.annotation.DeveloperApi +import org.apache.predictionio.controller.Params + +/** :: DeveloperApi :: + * Base class for all controllers + */ +@DeveloperApi +abstract class AbstractDoer extends Serializable + +/** :: DeveloperApi :: + * Provides facility to instantiate controller classes + */ +@DeveloperApi +object Doer extends Logging { + /** :: DeveloperApi :: + * Instantiates a controller class using supplied controller parameters as + * constructor parameters + * + * @param cls Class of the controller class + * @param params Parameters of the controller class + * @tparam C Controller class + * @return An instance of the controller class + */ + @DeveloperApi + def apply[C <: AbstractDoer] ( + cls: Class[_ <: C], params: Params): C = { + + // Subclasses only allows two kind of constructors. + // 1. Constructor with P <: Params. + // 2. Emtpy constructor. + // First try (1), if failed, try (2). + try { + val constr = cls.getConstructor(params.getClass) + constr.newInstance(params) + } catch { + case e: NoSuchMethodException => try { + val zeroConstr = cls.getConstructor() + zeroConstr.newInstance() + } catch { + case e: NoSuchMethodException => + error(s"${params.getClass.getName} was used as the constructor " + + s"argument to ${e.getMessage}, but no constructor can handle it. " + + "Aborting.") + sys.exit(1) + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/core/BaseAlgorithm.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/core/BaseAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/core/BaseAlgorithm.scala new file mode 100644 index 0000000..7774861 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/core/BaseAlgorithm.scala @@ -0,0 +1,123 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.core + +import com.google.gson.TypeAdapterFactory +import org.apache.predictionio.annotation.DeveloperApi +import org.apache.predictionio.controller.Params +import org.apache.predictionio.controller.Utils +import net.jodah.typetools.TypeResolver +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD + +/** :: DeveloperApi :: + * Base trait with default custom query serializer, exposed to engine developer + * via [[org.apache.predictionio.controller.CustomQuerySerializer]] + */ +@DeveloperApi +trait BaseQuerySerializer { + /** :: DeveloperApi :: + * Serializer for Scala query classes using + * [[org.apache.predictionio.controller.Utils.json4sDefaultFormats]] + */ + @DeveloperApi + @transient lazy val querySerializer = Utils.json4sDefaultFormats + + /** :: DeveloperApi :: + * Serializer for Java query classes using Gson + */ + @DeveloperApi + @transient lazy val gsonTypeAdapterFactories = Seq.empty[TypeAdapterFactory] +} + +/** :: DeveloperApi :: + * Base class of all algorithm controllers + * + * @tparam PD Prepared data class + * @tparam M Model class + * @tparam Q Query class + * @tparam P Predicted result class + */ +@DeveloperApi +abstract class BaseAlgorithm[PD, M, Q, P] + extends AbstractDoer with BaseQuerySerializer { + /** :: DeveloperApi :: + * Engine developers should not use this directly. This is called by workflow + * to train a model. + * + * @param sc Spark context + * @param pd Prepared data + * @return Trained model + */ + @DeveloperApi + def trainBase(sc: SparkContext, pd: PD): M + + /** :: DeveloperApi :: + * Engine developers should not use this directly. This is called by + * evaluation workflow to perform batch prediction. + * + * @param sc Spark context + * @param bm Model + * @param qs Batch of queries + * @return Batch of predicted results + */ + @DeveloperApi + def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)]) + : RDD[(Long, P)] + + /** :: DeveloperApi :: + * Engine developers should not use this directly. Called by serving to + * perform a single prediction. + * + * @param bm Model + * @param q Query + * @return Predicted result + */ + @DeveloperApi + def predictBase(bm: Any, q: Q): P + + /** :: DeveloperApi :: + * Engine developers should not use this directly. Prepare a model for + * persistence in the downstream consumer. PredictionIO supports 3 types of + * model persistence: automatic persistence, manual persistence, and + * re-training on deployment. This method provides a way for downstream + * modules to determine which mode the model should be persisted. + * + * @param sc Spark context + * @param modelId Model ID + * @param algoParams Algorithm parameters that trained this model + * @param bm Model + * @return The model itself for automatic persistence, an instance of + * [[org.apache.predictionio.workflow.PersistentModelManifest]] for manual + * persistence, or Unit for re-training on deployment + */ + @DeveloperApi + def makePersistentModel( + sc: SparkContext, + modelId: String, + algoParams: Params, + bm: Any): Any = Unit + + /** :: DeveloperApi :: + * Obtains the type signature of query for this algorithm + * + * @return Type signature of query + */ + def queryClass: Class[Q] = { + val types = TypeResolver.resolveRawArguments(classOf[BaseAlgorithm[PD, M, Q, P]], getClass) + types(2).asInstanceOf[Class[Q]] + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/core/BaseDataSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/core/BaseDataSource.scala b/core/src/main/scala/org/apache/predictionio/core/BaseDataSource.scala new file mode 100644 index 0000000..96e2548 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/core/BaseDataSource.scala @@ -0,0 +1,52 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.core + +import org.apache.predictionio.annotation.DeveloperApi +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD + +/** :: DeveloperApi :: + * Base class of all data source controllers + * + * @tparam TD Training data class + * @tparam EI Evaluation information class + * @tparam Q Query class + * @tparam A Actual result class + */ +@DeveloperApi +abstract class BaseDataSource[TD, EI, Q, A] extends AbstractDoer { + /** :: DeveloperApi :: + * Engine developer should not use this directly. This is called by workflow + * to read training data. + * + * @param sc Spark context + * @return Training data + */ + @DeveloperApi + def readTrainingBase(sc: SparkContext): TD + + /** :: DeveloperApi :: + * Engine developer should not use this directly. This is called by + * evaluation workflow to read training and validation data. + * + * @param sc Spark context + * @return Sets of training data, evaluation information, queries, and actual + * results + */ + @DeveloperApi + def readEvalBase(sc: SparkContext): Seq[(TD, EI, RDD[(Q, A)])] +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/core/BaseEngine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/core/BaseEngine.scala b/core/src/main/scala/org/apache/predictionio/core/BaseEngine.scala new file mode 100644 index 0000000..6dfce7d --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/core/BaseEngine.scala @@ -0,0 +1,100 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.core + +import org.apache.predictionio.annotation.DeveloperApi +import org.apache.predictionio.controller.EngineParams +import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption +import org.apache.predictionio.workflow.WorkflowParams +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.json4s.JValue + +/** :: DeveloperApi :: + * Base class of all engine controller classes + * + * @tparam EI Evaluation information class + * @tparam Q Query class + * @tparam P Predicted result class + * @tparam A Actual result class + */ +@DeveloperApi +abstract class BaseEngine[EI, Q, P, A] extends Serializable { + /** :: DeveloperApi :: + * Implement this method so that training this engine would return a list of + * models. + * + * @param sc An instance of SparkContext. + * @param engineParams An instance of [[EngineParams]] for running a single training. + * @param params An instance of [[WorkflowParams]] that controls the workflow. + * @return A list of models. + */ + @DeveloperApi + def train( + sc: SparkContext, + engineParams: EngineParams, + engineInstanceId: String, + params: WorkflowParams): Seq[Any] + + /** :: DeveloperApi :: + * Implement this method so that [[org.apache.predictionio.controller.Evaluation]] can + * use this method to generate inputs for [[org.apache.predictionio.controller.Metric]]. + * + * @param sc An instance of SparkContext. + * @param engineParams An instance of [[EngineParams]] for running a single evaluation. + * @param params An instance of [[WorkflowParams]] that controls the workflow. + * @return A list of evaluation information and RDD of query, predicted + * result, and actual result tuple tuple. + */ + @DeveloperApi + def eval( + sc: SparkContext, + engineParams: EngineParams, + params: WorkflowParams): Seq[(EI, RDD[(Q, P, A)])] + + /** :: DeveloperApi :: + * Override this method to further optimize the process that runs multiple + * evaluations (during tuning, for example). By default, this method calls + * [[eval]] for each element in the engine parameters list. + * + * @param sc An instance of SparkContext. + * @param engineParamsList A list of [[EngineParams]] for running batch evaluation. + * @param params An instance of [[WorkflowParams]] that controls the workflow. + * @return A list of engine parameters and evaluation result (from [[eval]]) tuples. + */ + @DeveloperApi + def batchEval( + sc: SparkContext, + engineParamsList: Seq[EngineParams], + params: WorkflowParams) + : Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])] = { + engineParamsList.map { engineParams => + (engineParams, eval(sc, engineParams, params)) + } + } + + /** :: DeveloperApi :: + * Implement this method to convert a JValue (read from an engine variant + * JSON file) to an instance of [[EngineParams]]. + * + * @param variantJson Content of the engine variant JSON as JValue. + * @param jsonExtractor Content of the engine variant JSON as JValue. + * @return An instance of [[EngineParams]] converted from JSON. + */ + @DeveloperApi + def jValueToEngineParams(variantJson: JValue, jsonExtractor: JsonExtractorOption): EngineParams = + throw new NotImplementedError("JSON to EngineParams is not implemented.") +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/core/BaseEvaluator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/core/BaseEvaluator.scala b/core/src/main/scala/org/apache/predictionio/core/BaseEvaluator.scala new file mode 100644 index 0000000..71a086d --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/core/BaseEvaluator.scala @@ -0,0 +1,72 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.core + +import org.apache.predictionio.annotation.DeveloperApi +import org.apache.predictionio.annotation.Experimental +import org.apache.predictionio.controller.EngineParams +import org.apache.predictionio.controller.Evaluation +import org.apache.predictionio.workflow.WorkflowParams +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD + +/** :: DeveloperApi :: + * Base class of all evaluator controller classes + * + * @tparam EI Evaluation information class + * @tparam Q Query class + * @tparam P Predicted result class + * @tparam A Actual result class + * @tparam ER Evaluation result class + */ +@DeveloperApi +abstract class BaseEvaluator[EI, Q, P, A, ER <: BaseEvaluatorResult] + extends AbstractDoer { + /** :: DeveloperApi :: + * Engine developers should not use this directly. This is called by + * evaluation workflow to perform evaluation. + * + * @param sc Spark context + * @param evaluation Evaluation to run + * @param engineEvalDataSet Sets of engine parameters and data for evaluation + * @param params Evaluation workflow parameters + * @return Evaluation result + */ + @DeveloperApi + def evaluateBase( + sc: SparkContext, + evaluation: Evaluation, + engineEvalDataSet: Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])], + params: WorkflowParams): ER +} + +/** Base trait of evaluator result */ +trait BaseEvaluatorResult extends Serializable { + /** A short description of the result */ + def toOneLiner(): String = "" + + /** HTML portion of the rendered evaluator results */ + def toHTML(): String = "" + + /** JSON portion of the rendered evaluator results */ + def toJSON(): String = "" + + /** :: Experimental :: + * Indicate if this result is inserted into database + */ + @Experimental + val noSave: Boolean = false +}
