http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/Engine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/Engine.scala b/core/src/main/scala/org/apache/predictionio/controller/Engine.scala new file mode 100644 index 0000000..c875a9f --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/Engine.scala @@ -0,0 +1,829 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import grizzled.slf4j.Logger +import org.apache.predictionio.core.BaseAlgorithm +import org.apache.predictionio.core.BaseDataSource +import org.apache.predictionio.core.BaseEngine +import org.apache.predictionio.core.BasePreparator +import org.apache.predictionio.core.BaseServing +import org.apache.predictionio.core.Doer +import org.apache.predictionio.data.storage.EngineInstance +import org.apache.predictionio.data.storage.StorageClientException +import org.apache.predictionio.workflow.CreateWorkflow +import org.apache.predictionio.workflow.EngineLanguage +import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption +import org.apache.predictionio.workflow.NameParamsSerializer +import org.apache.predictionio.workflow.PersistentModelManifest +import org.apache.predictionio.workflow.SparkWorkflowUtils +import org.apache.predictionio.workflow.StopAfterPrepareInterruption +import org.apache.predictionio.workflow.StopAfterReadInterruption +import org.apache.predictionio.workflow.WorkflowParams +import org.apache.predictionio.workflow.WorkflowUtils +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD +import org.json4s._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read + +import scala.collection.JavaConversions +import scala.language.implicitConversions + +/** This class chains up the entire data process. PredictionIO uses this + * information to create workflows and deployments. In Scala, you should + * implement an object that extends the [[EngineFactory]] trait similar to the + * following example. + * + * {{{ + * object ItemRankEngine extends EngineFactory { + * def apply() = { + * new Engine( + * classOf[ItemRankDataSource], + * classOf[ItemRankPreparator], + * Map( + * "knn" -> classOf[KNNAlgorithm], + * "rand" -> classOf[RandomAlgorithm], + * "mahoutItemBased" -> classOf[MahoutItemBasedAlgorithm]), + * classOf[ItemRankServing]) + * } + * } + * }}} + * + * @see [[EngineFactory]] + * @tparam TD Training data class. + * @tparam EI Evaluation info class. + * @tparam PD Prepared data class. + * @tparam Q Input query class. + * @tparam P Output prediction class. + * @tparam A Actual value class. + * @param dataSourceClassMap Map of data source names to class. + * @param preparatorClassMap Map of preparator names to class. + * @param algorithmClassMap Map of algorithm names to classes. + * @param servingClassMap Map of serving names to class. + * @group Engine + */ +class Engine[TD, EI, PD, Q, P, A]( + val dataSourceClassMap: Map[String, + Class[_ <: BaseDataSource[TD, EI, Q, A]]], + val preparatorClassMap: Map[String, Class[_ <: BasePreparator[TD, PD]]], + val algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]], + val servingClassMap: Map[String, Class[_ <: BaseServing[Q, P]]]) + extends BaseEngine[EI, Q, P, A] { + + private[prediction] + implicit lazy val formats = Utils.json4sDefaultFormats + + new NameParamsSerializer + + @transient lazy protected val logger = Logger[this.type] + + /** This auxiliary constructor is provided for backward compatibility. + * + * @param dataSourceClass Data source class. + * @param preparatorClass Preparator class. + * @param algorithmClassMap Map of algorithm names to classes. + * @param servingClass Serving class. + */ + def this( + dataSourceClass: Class[_ <: BaseDataSource[TD, EI, Q, A]], + preparatorClass: Class[_ <: BasePreparator[TD, PD]], + algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]], + servingClass: Class[_ <: BaseServing[Q, P]]) = this( + Map("" -> dataSourceClass), + Map("" -> preparatorClass), + algorithmClassMap, + Map("" -> servingClass) + ) + + /** Java-friendly constructor + * + * @param dataSourceClass Data source class. + * @param preparatorClass Preparator class. + * @param algorithmClassMap Map of algorithm names to classes. + * @param servingClass Serving class. + */ + def this(dataSourceClass: Class[_ <: BaseDataSource[TD, EI, Q, A]], + preparatorClass: Class[_ <: BasePreparator[TD, PD]], + algorithmClassMap: _root_.java.util.Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]], + servingClass: Class[_ <: BaseServing[Q, P]]) = this( + Map("" -> dataSourceClass), + Map("" -> preparatorClass), + JavaConversions.mapAsScalaMap(algorithmClassMap).toMap, + Map("" -> servingClass) + ) + + /** Returns a new Engine instance, mimicking case class's copy method behavior. + */ + def copy( + dataSourceClassMap: Map[String, Class[_ <: BaseDataSource[TD, EI, Q, A]]] + = dataSourceClassMap, + preparatorClassMap: Map[String, Class[_ <: BasePreparator[TD, PD]]] + = preparatorClassMap, + algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]] + = algorithmClassMap, + servingClassMap: Map[String, Class[_ <: BaseServing[Q, P]]] + = servingClassMap): Engine[TD, EI, PD, Q, P, A] = { + new Engine( + dataSourceClassMap, + preparatorClassMap, + algorithmClassMap, + servingClassMap) + } + + /** Training this engine would return a list of models. + * + * @param sc An instance of SparkContext. + * @param engineParams An instance of [[EngineParams]] for running a single training. + * @param params An instance of [[WorkflowParams]] that controls the workflow. + * @return A list of models. + */ + def train( + sc: SparkContext, + engineParams: EngineParams, + engineInstanceId: String, + params: WorkflowParams): Seq[Any] = { + val (dataSourceName, dataSourceParams) = engineParams.dataSourceParams + val dataSource = Doer(dataSourceClassMap(dataSourceName), dataSourceParams) + + val (preparatorName, preparatorParams) = engineParams.preparatorParams + val preparator = Doer(preparatorClassMap(preparatorName), preparatorParams) + + val algoParamsList = engineParams.algorithmParamsList + require( + algoParamsList.size > 0, + "EngineParams.algorithmParamsList must have at least 1 element.") + + val algorithms = algoParamsList.map { case (algoName, algoParams) => + Doer(algorithmClassMap(algoName), algoParams) + } + + val models = Engine.train( + sc, dataSource, preparator, algorithms, params) + + val algoCount = algorithms.size + val algoTuples: Seq[(String, Params, BaseAlgorithm[_, _, _, _], Any)] = + (0 until algoCount).map { ax => { + // val (name, params) = algoParamsList(ax) + val (name, params) = algoParamsList(ax) + (name, params, algorithms(ax), models(ax)) + }} + + makeSerializableModels( + sc, + engineInstanceId = engineInstanceId, + algoTuples = algoTuples) + } + + /** Algorithm models can be persisted before deploy. However, it is also + * possible that models are not persisted. This method retrains non-persisted + * models and return a list of models that can be used directly in deploy. + */ + private[prediction] + def prepareDeploy( + sc: SparkContext, + engineParams: EngineParams, + engineInstanceId: String, + persistedModels: Seq[Any], + params: WorkflowParams): Seq[Any] = { + + val algoParamsList = engineParams.algorithmParamsList + val algorithms = algoParamsList.map { case (algoName, algoParams) => + Doer(algorithmClassMap(algoName), algoParams) + } + + val models = if (persistedModels.exists(m => m.isInstanceOf[Unit.type])) { + // If any of persistedModels is Unit, we need to re-train the model. + logger.info("Some persisted models are Unit, need to re-train.") + val (dataSourceName, dataSourceParams) = engineParams.dataSourceParams + val dataSource = Doer(dataSourceClassMap(dataSourceName), dataSourceParams) + + val (preparatorName, preparatorParams) = engineParams.preparatorParams + val preparator = Doer(preparatorClassMap(preparatorName), preparatorParams) + + val td = dataSource.readTrainingBase(sc) + val pd = preparator.prepareBase(sc, td) + + val models = algorithms.zip(persistedModels).map { case (algo, m) => + m match { + case Unit => algo.trainBase(sc, pd) + case _ => m + } + } + models + } else { + logger.info("Using persisted model") + persistedModels + } + + models + .zip(algorithms) + .zip(algoParamsList) + .zipWithIndex + .map { + case (((model, algo), (algoName, algoParams)), ax) => { + model match { + case modelManifest: PersistentModelManifest => { + logger.info("Custom-persisted model detected for algorithm " + + algo.getClass.getName) + SparkWorkflowUtils.getPersistentModel( + modelManifest, + Seq(engineInstanceId, ax, algoName).mkString("-"), + algoParams, + Some(sc), + getClass.getClassLoader) + } + case m => { + try { + logger.info( + s"Loaded model ${m.getClass.getName} for algorithm " + + s"${algo.getClass.getName}") + sc.stop + m + } catch { + case e: NullPointerException => + logger.warn( + s"Null model detected for algorithm ${algo.getClass.getName}") + m + } + } + } // model match + } + } + } + + /** Extract model for persistent layer. + * + * PredictionIO presist models for future use. It allows custom + * implementation for persisting models. You need to implement the + * [[org.apache.predictionio.controller.PersistentModel]] interface. This method + * traverses all models in the workflow. If the model is a + * [[org.apache.predictionio.controller.PersistentModel]], it calls the save method + * for custom persistence logic. + * + * For model doesn't support custom logic, PredictionIO serializes the whole + * model if the corresponding algorithm is local. On the other hand, if the + * model is parallel (i.e. model associated with a number of huge RDDS), this + * method return Unit, in which case PredictionIO will retrain the whole + * model from scratch next time it is used. + */ + private def makeSerializableModels( + sc: SparkContext, + engineInstanceId: String, + // AlgoName, Algo, Model + algoTuples: Seq[(String, Params, BaseAlgorithm[_, _, _, _], Any)] + ): Seq[Any] = { + + logger.info(s"engineInstanceId=$engineInstanceId") + + algoTuples + .zipWithIndex + .map { case ((name, params, algo, model), ax) => + algo.makePersistentModel( + sc = sc, + modelId = Seq(engineInstanceId, ax, name).mkString("-"), + algoParams = params, + bm = model) + } + } + + /** This is implemented such that [[org.apache.predictionio.controller.Evaluation]] can + * use this method to generate inputs for [[org.apache.predictionio.controller.Metric]]. + * + * @param sc An instance of SparkContext. + * @param engineParams An instance of [[EngineParams]] for running a single evaluation. + * @param params An instance of [[WorkflowParams]] that controls the workflow. + * @return A list of evaluation information and RDD of query, predicted + * result, and actual result tuple tuple. + */ + def eval( + sc: SparkContext, + engineParams: EngineParams, + params: WorkflowParams) + : Seq[(EI, RDD[(Q, P, A)])] = { + val (dataSourceName, dataSourceParams) = engineParams.dataSourceParams + val dataSource = Doer(dataSourceClassMap(dataSourceName), dataSourceParams) + + val (preparatorName, preparatorParams) = engineParams.preparatorParams + val preparator = Doer(preparatorClassMap(preparatorName), preparatorParams) + + val algoParamsList = engineParams.algorithmParamsList + require( + algoParamsList.size > 0, + "EngineParams.algorithmParamsList must have at least 1 element.") + + val algorithms = algoParamsList.map { case (algoName, algoParams) => { + try { + Doer(algorithmClassMap(algoName), algoParams) + } catch { + case e: NoSuchElementException => { + if (algoName == "") { + logger.error("Empty algorithm name supplied but it could not " + + "match with any algorithm in the engine's definition. " + + "Existing algorithm name(s) are: " + + s"${algorithmClassMap.keys.mkString(", ")}. Aborting.") + } else { + logger.error(s"$algoName cannot be found in the engine's " + + "definition. Existing algorithm name(s) are: " + + s"${algorithmClassMap.keys.mkString(", ")}. Aborting.") + } + sys.exit(1) + } + } + }} + + val (servingName, servingParams) = engineParams.servingParams + val serving = Doer(servingClassMap(servingName), servingParams) + + Engine.eval(sc, dataSource, preparator, algorithms, serving) + } + + override def jValueToEngineParams( + variantJson: JValue, + jsonExtractor: JsonExtractorOption): EngineParams = { + + val engineLanguage = EngineLanguage.Scala + // Extract EngineParams + logger.info(s"Extracting datasource params...") + val dataSourceParams: (String, Params) = + WorkflowUtils.getParamsFromJsonByFieldAndClass( + variantJson, + "datasource", + dataSourceClassMap, + engineLanguage, + jsonExtractor) + logger.info(s"Datasource params: $dataSourceParams") + + logger.info(s"Extracting preparator params...") + val preparatorParams: (String, Params) = + WorkflowUtils.getParamsFromJsonByFieldAndClass( + variantJson, + "preparator", + preparatorClassMap, + engineLanguage, + jsonExtractor) + logger.info(s"Preparator params: $preparatorParams") + + val algorithmsParams: Seq[(String, Params)] = + variantJson findField { + case JField("algorithms", _) => true + case _ => false + } map { jv => + val algorithmsParamsJson = jv._2 + algorithmsParamsJson match { + case JArray(s) => s.map { algorithmParamsJValue => + val eap = algorithmParamsJValue.extract[CreateWorkflow.AlgorithmParams] + ( + eap.name, + WorkflowUtils.extractParams( + engineLanguage, + compact(render(eap.params)), + algorithmClassMap(eap.name), + jsonExtractor) + ) + } + case _ => Nil + } + } getOrElse Seq(("", EmptyParams())) + + logger.info(s"Extracting serving params...") + val servingParams: (String, Params) = + WorkflowUtils.getParamsFromJsonByFieldAndClass( + variantJson, + "serving", + servingClassMap, + engineLanguage, + jsonExtractor) + logger.info(s"Serving params: $servingParams") + + new EngineParams( + dataSourceParams = dataSourceParams, + preparatorParams = preparatorParams, + algorithmParamsList = algorithmsParams, + servingParams = servingParams) + } + + private[prediction] def engineInstanceToEngineParams( + engineInstance: EngineInstance, + jsonExtractor: JsonExtractorOption): EngineParams = { + + implicit val formats = DefaultFormats + val engineLanguage = EngineLanguage.Scala + + val dataSourceParamsWithName: (String, Params) = { + val (name, params) = + read[(String, JValue)](engineInstance.dataSourceParams) + if (!dataSourceClassMap.contains(name)) { + logger.error(s"Unable to find datasource class with name '$name'" + + " defined in Engine.") + sys.exit(1) + } + val extractedParams = WorkflowUtils.extractParams( + engineLanguage, + compact(render(params)), + dataSourceClassMap(name), + jsonExtractor) + (name, extractedParams) + } + + val preparatorParamsWithName: (String, Params) = { + val (name, params) = + read[(String, JValue)](engineInstance.preparatorParams) + if (!preparatorClassMap.contains(name)) { + logger.error(s"Unable to find preparator class with name '$name'" + + " defined in Engine.") + sys.exit(1) + } + val extractedParams = WorkflowUtils.extractParams( + engineLanguage, + compact(render(params)), + preparatorClassMap(name), + jsonExtractor) + (name, extractedParams) + } + + val algorithmsParamsWithNames = + read[Seq[(String, JValue)]](engineInstance.algorithmsParams).map { + case (algoName, params) => + val extractedParams = WorkflowUtils.extractParams( + engineLanguage, + compact(render(params)), + algorithmClassMap(algoName), + jsonExtractor) + (algoName, extractedParams) + } + + val servingParamsWithName: (String, Params) = { + val (name, params) = read[(String, JValue)](engineInstance.servingParams) + if (!servingClassMap.contains(name)) { + logger.error(s"Unable to find serving class with name '$name'" + + " defined in Engine.") + sys.exit(1) + } + val extractedParams = WorkflowUtils.extractParams( + engineLanguage, + compact(render(params)), + servingClassMap(name), + jsonExtractor) + (name, extractedParams) + } + + new EngineParams( + dataSourceParams = dataSourceParamsWithName, + preparatorParams = preparatorParamsWithName, + algorithmParamsList = algorithmsParamsWithNames, + servingParams = servingParamsWithName) + } +} + +/** This object contains concrete implementation for some methods of the + * [[Engine]] class. + * + * @group Engine + */ +object Engine { + private type EX = Int + private type AX = Int + private type QX = Long + + @transient lazy private val logger = Logger[this.type] + + /** Helper class to accept either a single data source, or a map of data + * sources, with a companion object providing implicit conversions, so + * using this class directly is not necessary. + * + * @tparam TD Training data class + * @tparam EI Evaluation information class + * @tparam Q Input query class + * @tparam A Actual result class + */ + class DataSourceMap[TD, EI, Q, A]( + val m: Map[String, Class[_ <: BaseDataSource[TD, EI, Q, A]]]) { + def this(c: Class[_ <: BaseDataSource[TD, EI, Q, A]]) = this(Map("" -> c)) + } + + /** Companion object providing implicit conversions, so using this directly + * is not necessary. + */ + object DataSourceMap { + implicit def cToMap[TD, EI, Q, A]( + c: Class[_ <: BaseDataSource[TD, EI, Q, A]]): + DataSourceMap[TD, EI, Q, A] = new DataSourceMap(c) + implicit def mToMap[TD, EI, Q, A]( + m: Map[String, Class[_ <: BaseDataSource[TD, EI, Q, A]]]): + DataSourceMap[TD, EI, Q, A] = new DataSourceMap(m) + } + + /** Helper class to accept either a single preparator, or a map of + * preparators, with a companion object providing implicit conversions, so + * using this class directly is not necessary. + * + * @tparam TD Training data class + * @tparam PD Prepared data class + */ + class PreparatorMap[TD, PD]( + val m: Map[String, Class[_ <: BasePreparator[TD, PD]]]) { + def this(c: Class[_ <: BasePreparator[TD, PD]]) = this(Map("" -> c)) + } + + /** Companion object providing implicit conversions, so using this directly + * is not necessary. + */ + object PreparatorMap { + implicit def cToMap[TD, PD]( + c: Class[_ <: BasePreparator[TD, PD]]): + PreparatorMap[TD, PD] = new PreparatorMap(c) + implicit def mToMap[TD, PD]( + m: Map[String, Class[_ <: BasePreparator[TD, PD]]]): + PreparatorMap[TD, PD] = new PreparatorMap(m) + } + + /** Helper class to accept either a single serving, or a map of serving, with + * a companion object providing implicit conversions, so using this class + * directly is not necessary. + * + * @tparam Q Input query class + * @tparam P Predicted result class + */ + class ServingMap[Q, P]( + val m: Map[String, Class[_ <: BaseServing[Q, P]]]) { + def this(c: Class[_ <: BaseServing[Q, P]]) = this(Map("" -> c)) + } + + /** Companion object providing implicit conversions, so using this directly + * is not necessary. + */ + object ServingMap { + implicit def cToMap[Q, P]( + c: Class[_ <: BaseServing[Q, P]]): ServingMap[Q, P] = + new ServingMap(c) + implicit def mToMap[Q, P]( + m: Map[String, Class[_ <: BaseServing[Q, P]]]): ServingMap[Q, P] = + new ServingMap(m) + } + + /** Convenient method for returning an instance of [[Engine]]. + * + * @param dataSourceMap Accepts either an instance of Class of the data + * source, or a Map of data source classes (implicitly + * converted to [[DataSourceMap]]. + * @param preparatorMap Accepts either an instance of Class of the + * preparator, or a Map of preparator classes + * (implicitly converted to [[PreparatorMap]]. + * @param algorithmClassMap Accepts a Map of algorithm classes. + * @param servingMap Accepts either an instance of Class of the serving, or + * a Map of serving classes (implicitly converted to + * [[ServingMap]]. + * @tparam TD Training data class + * @tparam EI Evaluation information class + * @tparam PD Prepared data class + * @tparam Q Input query class + * @tparam P Predicted result class + * @tparam A Actual result class + * @return An instance of [[Engine]] + */ + def apply[TD, EI, PD, Q, P, A]( + dataSourceMap: DataSourceMap[TD, EI, Q, A], + preparatorMap: PreparatorMap[TD, PD], + algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]], + servingMap: ServingMap[Q, P]): Engine[TD, EI, PD, Q, P, A] = new Engine( + dataSourceMap.m, + preparatorMap.m, + algorithmClassMap, + servingMap.m + ) + + /** Provides concrete implementation of training for [[Engine]]. + * + * @param sc An instance of SparkContext + * @param dataSource An instance of data source + * @param preparator An instance of preparator + * @param algorithmList A list of algorithm instances + * @param params An instance of [[WorkflowParams]] that controls the training + * process. + * @tparam TD Training data class + * @tparam PD Prepared data class + * @tparam Q Input query class + * @return A list of trained models + */ + def train[TD, PD, Q]( + sc: SparkContext, + dataSource: BaseDataSource[TD, _, Q, _], + preparator: BasePreparator[TD, PD], + algorithmList: Seq[BaseAlgorithm[PD, _, Q, _]], + params: WorkflowParams + ): Seq[Any] = { + logger.info("EngineWorkflow.train") + logger.info(s"DataSource: $dataSource") + logger.info(s"Preparator: $preparator") + logger.info(s"AlgorithmList: $algorithmList") + + if (params.skipSanityCheck) { + logger.info("Data sanity check is off.") + } else { + logger.info("Data sanity check is on.") + } + + val td = try { + dataSource.readTrainingBase(sc) + } catch { + case e: StorageClientException => + logger.error(s"Error occured reading from data source. (Reason: " + + e.getMessage + ") Please see the log for debugging details.", e) + sys.exit(1) + } + + if (!params.skipSanityCheck) { + td match { + case sanityCheckable: SanityCheck => { + logger.info(s"${td.getClass.getName} supports data sanity" + + " check. Performing check.") + sanityCheckable.sanityCheck() + } + case _ => { + logger.info(s"${td.getClass.getName} does not support" + + " data sanity check. Skipping check.") + } + } + } + + if (params.stopAfterRead) { + logger.info("Stopping here because --stop-after-read is set.") + throw StopAfterReadInterruption() + } + + val pd = preparator.prepareBase(sc, td) + + if (!params.skipSanityCheck) { + pd match { + case sanityCheckable: SanityCheck => { + logger.info(s"${pd.getClass.getName} supports data sanity" + + " check. Performing check.") + sanityCheckable.sanityCheck() + } + case _ => { + logger.info(s"${pd.getClass.getName} does not support" + + " data sanity check. Skipping check.") + } + } + } + + if (params.stopAfterPrepare) { + logger.info("Stopping here because --stop-after-prepare is set.") + throw StopAfterPrepareInterruption() + } + + val models: Seq[Any] = algorithmList.map(_.trainBase(sc, pd)) + + if (!params.skipSanityCheck) { + models.foreach { model => { + model match { + case sanityCheckable: SanityCheck => { + logger.info(s"${model.getClass.getName} supports data sanity" + + " check. Performing check.") + sanityCheckable.sanityCheck() + } + case _ => { + logger.info(s"${model.getClass.getName} does not support" + + " data sanity check. Skipping check.") + } + } + }} + } + + logger.info("EngineWorkflow.train completed") + models + } + + /** Provides concrete implementation of evaluation for [[Engine]]. + * + * @param sc An instance of SparkContext + * @param dataSource An instance of data source + * @param preparator An instance of preparator + * @param algorithmList A list of algorithm instances + * @param serving An instance of serving + * @tparam TD Training data class + * @tparam PD Prepared data class + * @tparam Q Input query class + * @tparam P Predicted result class + * @tparam A Actual result class + * @tparam EI Evaluation information class + * @return A list of evaluation information, RDD of query, predicted result, + * and actual result tuple tuple. + */ + def eval[TD, PD, Q, P, A, EI]( + sc: SparkContext, + dataSource: BaseDataSource[TD, EI, Q, A], + preparator: BasePreparator[TD, PD], + algorithmList: Seq[BaseAlgorithm[PD, _, Q, P]], + serving: BaseServing[Q, P]): Seq[(EI, RDD[(Q, P, A)])] = { + logger.info(s"DataSource: $dataSource") + logger.info(s"Preparator: $preparator") + logger.info(s"AlgorithmList: $algorithmList") + logger.info(s"Serving: $serving") + + val algoMap: Map[AX, BaseAlgorithm[PD, _, Q, P]] = algorithmList + .zipWithIndex + .map(_.swap) + .toMap + val algoCount = algoMap.size + + val evalTupleMap: Map[EX, (TD, EI, RDD[(Q, A)])] = dataSource + .readEvalBase(sc) + .zipWithIndex + .map(_.swap) + .toMap + + val evalCount = evalTupleMap.size + + val evalTrainMap: Map[EX, TD] = evalTupleMap.mapValues(_._1) + val evalInfoMap: Map[EX, EI] = evalTupleMap.mapValues(_._2) + val evalQAsMap: Map[EX, RDD[(QX, (Q, A))]] = evalTupleMap + .mapValues(_._3) + .mapValues{ _.zipWithUniqueId().map(_.swap) } + + val preparedMap: Map[EX, PD] = evalTrainMap.mapValues { td => { + preparator.prepareBase(sc, td) + }} + + val algoModelsMap: Map[EX, Map[AX, Any]] = preparedMap.mapValues { pd => { + algoMap.mapValues(_.trainBase(sc,pd)) + }} + + val suppQAsMap: Map[EX, RDD[(QX, (Q, A))]] = evalQAsMap.mapValues { qas => + qas.map { case (qx, (q, a)) => (qx, (serving.supplementBase(q), a)) } + } + + val algoPredictsMap: Map[EX, RDD[(QX, Seq[P])]] = (0 until evalCount) + .map { ex => { + val modelMap: Map[AX, Any] = algoModelsMap(ex) + + val qs: RDD[(QX, Q)] = suppQAsMap(ex).mapValues(_._1) + + val algoPredicts: Seq[RDD[(QX, (AX, P))]] = (0 until algoCount) + .map { ax => { + val algo = algoMap(ax) + val model = modelMap(ax) + val rawPredicts: RDD[(QX, P)] = algo.batchPredictBase(sc, model, qs) + val predicts: RDD[(QX, (AX, P))] = rawPredicts.map { case (qx, p) => { + (qx, (ax, p)) + }} + predicts + }} + + val unionAlgoPredicts: RDD[(QX, Seq[P])] = sc.union(algoPredicts) + .groupByKey() + .mapValues { ps => { + assert (ps.size == algoCount, "Must have same length as algoCount") + // TODO. Check size == algoCount + ps.toSeq.sortBy(_._1).map(_._2) + }} + + (ex, unionAlgoPredicts) + }} + .toMap + + val servingQPAMap: Map[EX, RDD[(Q, P, A)]] = algoPredictsMap + .map { case (ex, psMap) => { + // The query passed to serving.serve is the original one, not + // supplemented. + val qasMap: RDD[(QX, (Q, A))] = evalQAsMap(ex) + val qpsaMap: RDD[(QX, Q, Seq[P], A)] = psMap.join(qasMap) + .map { case (qx, t) => (qx, t._2._1, t._1, t._2._2) } + + val qpaMap: RDD[(Q, P, A)] = qpsaMap.map { + case (qx, q, ps, a) => (q, serving.serveBase(q, ps), a) + } + (ex, qpaMap) + }} + + (0 until evalCount).map { ex => { + (evalInfoMap(ex), servingQPAMap(ex)) + }} + .toSeq + } +} + +/** Mix in this trait for queries that contain prId (PredictedResultId). + * This is useful when your engine expects queries to also be associated with + * prId keys when feedback loop is enabled. + * + * @group Helper + */ +@deprecated("To be removed in future releases.", "0.9.2") +trait WithPrId { + val prId: String = "" +}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/EngineFactory.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/EngineFactory.scala b/core/src/main/scala/org/apache/predictionio/controller/EngineFactory.scala new file mode 100644 index 0000000..e9db35b --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/EngineFactory.scala @@ -0,0 +1,41 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import org.apache.predictionio.core.BaseEngine + +import scala.language.implicitConversions + +/** If you intend to let PredictionIO create workflow and deploy serving + * automatically, you will need to implement an object that extends this class + * and return an [[Engine]]. + * + * @group Engine + */ +abstract class EngineFactory { + /** Creates an instance of an [[Engine]]. */ + def apply(): BaseEngine[_, _, _, _] + + /** Override this method to programmatically return engine parameters. */ + def engineParams(key: String): EngineParams = EngineParams() +} + +/** DEPRECATED. Use [[EngineFactory]] instead. + * + * @group Engine + */ +@deprecated("Use EngineFactory instead.", "0.9.2") +trait IEngineFactory extends EngineFactory http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala b/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala new file mode 100644 index 0000000..b419255 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala @@ -0,0 +1,149 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import org.apache.predictionio.core.BaseDataSource +import org.apache.predictionio.core.BaseAlgorithm + +import scala.collection.JavaConversions +import scala.language.implicitConversions + +/** This class serves as a logical grouping of all required engine's parameters. + * + * @param dataSourceParams Data Source name-parameters tuple. + * @param preparatorParams Preparator name-parameters tuple. + * @param algorithmParamsList List of algorithm name-parameter pairs. + * @param servingParams Serving name-parameters tuple. + * @group Engine + */ +class EngineParams( + val dataSourceParams: (String, Params) = ("", EmptyParams()), + val preparatorParams: (String, Params) = ("", EmptyParams()), + val algorithmParamsList: Seq[(String, Params)] = Seq(), + val servingParams: (String, Params) = ("", EmptyParams())) + extends Serializable { + + /** Java-friendly constructor + * + * @param dataSourceName Data Source name + * @param dataSourceParams Data Source parameters + * @param preparatorName Preparator name + * @param preparatorParams Preparator parameters + * @param algorithmParamsList Map of algorithm name-parameters + * @param servingName Serving name + * @param servingParams Serving parameters + */ + def this( + dataSourceName: String, + dataSourceParams: Params, + preparatorName: String, + preparatorParams: Params, + algorithmParamsList: _root_.java.util.Map[String, _ <: Params], + servingName: String, + servingParams: Params) = { + + // To work around a json4s weird limitation, the parameter names can not be changed + this( + (dataSourceName, dataSourceParams), + (preparatorName, preparatorParams), + JavaConversions.mapAsScalaMap(algorithmParamsList).toSeq, + (servingName, servingParams) + ) + } + + // A case class style copy method. + def copy( + dataSourceParams: (String, Params) = dataSourceParams, + preparatorParams: (String, Params) = preparatorParams, + algorithmParamsList: Seq[(String, Params)] = algorithmParamsList, + servingParams: (String, Params) = servingParams): EngineParams = { + + new EngineParams( + dataSourceParams, + preparatorParams, + algorithmParamsList, + servingParams) + } +} + +/** Companion object for creating [[EngineParams]] instances. + * + * @group Engine + */ +object EngineParams { + /** Create EngineParams. + * + * @param dataSourceName Data Source name + * @param dataSourceParams Data Source parameters + * @param preparatorName Preparator name + * @param preparatorParams Preparator parameters + * @param algorithmParamsList List of algorithm name-parameter pairs. + * @param servingName Serving name + * @param servingParams Serving parameters + */ + def apply( + dataSourceName: String = "", + dataSourceParams: Params = EmptyParams(), + preparatorName: String = "", + preparatorParams: Params = EmptyParams(), + algorithmParamsList: Seq[(String, Params)] = Seq(), + servingName: String = "", + servingParams: Params = EmptyParams()): EngineParams = { + new EngineParams( + dataSourceParams = (dataSourceName, dataSourceParams), + preparatorParams = (preparatorName, preparatorParams), + algorithmParamsList = algorithmParamsList, + servingParams = (servingName, servingParams) + ) + } +} + +/** SimpleEngine has only one algorithm, and uses default preparator and serving + * layer. Current default preparator is `IdentityPreparator` and serving is + * `FirstServing`. + * + * @tparam TD Training data class. + * @tparam EI Evaluation info class. + * @tparam Q Input query class. + * @tparam P Output prediction class. + * @tparam A Actual value class. + * @param dataSourceClass Data source class. + * @param algorithmClass of algorithm names to classes. + * @group Engine + */ +class SimpleEngine[TD, EI, Q, P, A]( + dataSourceClass: Class[_ <: BaseDataSource[TD, EI, Q, A]], + algorithmClass: Class[_ <: BaseAlgorithm[TD, _, Q, P]]) + extends Engine( + dataSourceClass, + IdentityPreparator(dataSourceClass), + Map("" -> algorithmClass), + LFirstServing(algorithmClass)) + +/** This shorthand class serves the `SimpleEngine` class. + * + * @param dataSourceParams Data source parameters. + * @param algorithmParams List of algorithm name-parameter pairs. + * @group Engine + */ +class SimpleEngineParams( + dataSourceParams: Params = EmptyParams(), + algorithmParams: Params = EmptyParams()) + extends EngineParams( + dataSourceParams = ("", dataSourceParams), + algorithmParamsList = Seq(("", algorithmParams))) + + http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/EngineParamsGenerator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/EngineParamsGenerator.scala b/core/src/main/scala/org/apache/predictionio/controller/EngineParamsGenerator.scala new file mode 100644 index 0000000..2e26b83 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/EngineParamsGenerator.scala @@ -0,0 +1,43 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import scala.language.implicitConversions + +/** Defines an engine parameters generator. + * + * Implementations of this trait can be supplied to "pio eval" as the second + * command line argument. + * + * @group Evaluation + */ +trait EngineParamsGenerator { + protected[this] var epList: Seq[EngineParams] = _ + protected[this] var epListSet: Boolean = false + + /** Returns the list of [[EngineParams]] of this [[EngineParamsGenerator]]. */ + def engineParamsList: Seq[EngineParams] = { + assert(epListSet, "EngineParamsList not set") + epList + } + + /** Sets the list of [[EngineParams]] of this [[EngineParamsGenerator]]. */ + def engineParamsList_=(l: Seq[EngineParams]) { + assert(!epListSet, "EngineParamsList can bet set at most once") + epList = Seq(l:_*) + epListSet = true + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/Evaluation.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/Evaluation.scala b/core/src/main/scala/org/apache/predictionio/controller/Evaluation.scala new file mode 100644 index 0000000..c720c4f --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/Evaluation.scala @@ -0,0 +1,122 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import org.apache.predictionio.core.BaseEngine +import org.apache.predictionio.core.BaseEvaluator +import org.apache.predictionio.core.BaseEvaluatorResult + +import scala.language.implicitConversions + +/** Defines an evaluation that contains an engine and a metric. + * + * Implementations of this trait can be supplied to "pio eval" as the first + * argument. + * + * @group Evaluation + */ +trait Evaluation extends Deployment { + protected [this] var _evaluatorSet: Boolean = false + protected [this] var _evaluator: BaseEvaluator[_, _, _, _, _ <: BaseEvaluatorResult] = _ + + private [prediction] + def evaluator: BaseEvaluator[_, _, _, _, _ <: BaseEvaluatorResult] = { + assert(_evaluatorSet, "Evaluator not set") + _evaluator + } + + /** Gets the tuple of the [[Engine]] and the implementation of + * [[org.apache.predictionio.core.BaseEvaluator]] + */ + def engineEvaluator + : (BaseEngine[_, _, _, _], BaseEvaluator[_, _, _, _, _]) = { + assert(_evaluatorSet, "Evaluator not set") + (engine, _evaluator) + } + + /** Sets both an [[Engine]] and an implementation of + * [[org.apache.predictionio.core.BaseEvaluator]] for this [[Evaluation]] + * + * @param engineEvaluator A tuple an [[Engine]] and an implementation of + * [[org.apache.predictionio.core.BaseEvaluator]] + * @tparam EI Evaluation information class + * @tparam Q Query class + * @tparam P Predicted result class + * @tparam A Actual result class + * @tparam R Metric result class + */ + def engineEvaluator_=[EI, Q, P, A, R <: BaseEvaluatorResult]( + engineEvaluator: ( + BaseEngine[EI, Q, P, A], + BaseEvaluator[EI, Q, P, A, R])) { + assert(!_evaluatorSet, "Evaluator can be set at most once") + engine = engineEvaluator._1 + _evaluator = engineEvaluator._2 + _evaluatorSet = true + } + + /** Returns both the [[Engine]] and the implementation of [[Metric]] for this + * [[Evaluation]] + */ + def engineMetric: (BaseEngine[_, _, _, _], Metric[_, _, _, _, _]) = { + throw new NotImplementedError("This method is to keep the compiler happy") + } + + /** Sets both an [[Engine]] and an implementation of [[Metric]] for this + * [[Evaluation]] + * + * @param engineMetric A tuple of [[Engine]] and an implementation of + * [[Metric]] + * @tparam EI Evaluation information class + * @tparam Q Query class + * @tparam P Predicted result class + * @tparam A Actual result class + */ + def engineMetric_=[EI, Q, P, A]( + engineMetric: (BaseEngine[EI, Q, P, A], Metric[EI, Q, P, A, _])) { + engineEvaluator = ( + engineMetric._1, + MetricEvaluator( + metric = engineMetric._2, + otherMetrics = Seq[Metric[EI, Q, P, A, _]](), + outputPath = "best.json")) + } + + private [prediction] + def engineMetrics: (BaseEngine[_, _, _, _], Metric[_, _, _, _, _]) = { + throw new NotImplementedError("This method is to keep the compiler happy") + } + + /** Sets an [[Engine]], an implementation of [[Metric]], and sequence of + * implementations of [[Metric]] for this [[Evaluation]] + * + * @param engineMetrics A tuple of [[Engine]], an implementation of + * [[Metric]] and sequence of implementations of [[Metric]] + * @tparam EI Evaluation information class + * @tparam Q Query class + * @tparam P Predicted result class + * @tparam A Actual result class + */ + def engineMetrics_=[EI, Q, P, A]( + engineMetrics: ( + BaseEngine[EI, Q, P, A], + Metric[EI, Q, P, A, _], + Seq[Metric[EI, Q, P, A, _]])) { + engineEvaluator = ( + engineMetrics._1, + MetricEvaluator(engineMetrics._2, engineMetrics._3)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala b/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala new file mode 100644 index 0000000..868d818 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala @@ -0,0 +1,343 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import org.apache.predictionio.core.BaseDataSource +import org.apache.predictionio.core.BasePreparator +import org.apache.predictionio.core.BaseAlgorithm +import org.apache.predictionio.core.BaseServing +import org.apache.predictionio.core.Doer +import org.apache.predictionio.annotation.Experimental + +import grizzled.slf4j.Logger +import org.apache.predictionio.workflow.WorkflowParams +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +import scala.language.implicitConversions + +import _root_.java.util.NoSuchElementException + +import scala.collection.mutable.{ HashMap => MutableHashMap } + +/** :: Experimental :: + * Workflow based on [[FastEvalEngine]] + * + * @group Evaluation + */ +@Experimental +object FastEvalEngineWorkflow { + @transient lazy val logger = Logger[this.type] + + type EX = Int + type AX = Int + type QX = Long + + case class DataSourcePrefix(dataSourceParams: (String, Params)) { + def this(pp: PreparatorPrefix) = this(pp.dataSourceParams) + def this(ap: AlgorithmsPrefix) = this(ap.dataSourceParams) + def this(sp: ServingPrefix) = this(sp.dataSourceParams) + } + + case class PreparatorPrefix( + dataSourceParams: (String, Params), + preparatorParams: (String, Params)) { + def this(ap: AlgorithmsPrefix) = { + this(ap.dataSourceParams, ap.preparatorParams) + } + } + + case class AlgorithmsPrefix( + dataSourceParams: (String, Params), + preparatorParams: (String, Params), + algorithmParamsList: Seq[(String, Params)]) { + def this(sp: ServingPrefix) = { + this(sp.dataSourceParams, sp.preparatorParams, sp.algorithmParamsList) + } + } + + case class ServingPrefix( + dataSourceParams: (String, Params), + preparatorParams: (String, Params), + algorithmParamsList: Seq[(String, Params)], + servingParams: (String, Params)) { + def this(ep: EngineParams) = this( + ep.dataSourceParams, + ep.preparatorParams, + ep.algorithmParamsList, + ep.servingParams) + } + + def getDataSourceResult[TD, EI, PD, Q, P, A]( + workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A], + prefix: DataSourcePrefix) + : Map[EX, (TD, EI, RDD[(QX, (Q, A))])] = { + val cache = workflow.dataSourceCache + + if (!cache.contains(prefix)) { + val dataSource = Doer( + workflow.engine.dataSourceClassMap(prefix.dataSourceParams._1), + prefix.dataSourceParams._2) + + val result = dataSource + .readEvalBase(workflow.sc) + .map { case (td, ei, qaRDD) => { + (td, ei, qaRDD.zipWithUniqueId().map(_.swap)) + }} + .zipWithIndex + .map(_.swap) + .toMap + + cache += Tuple2(prefix, result) + } + cache(prefix) + } + + def getPreparatorResult[TD, EI, PD, Q, P, A]( + workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A], + prefix: PreparatorPrefix): Map[EX, PD] = { + val cache = workflow.preparatorCache + + if (!cache.contains(prefix)) { + val preparator = Doer( + workflow.engine.preparatorClassMap(prefix.preparatorParams._1), + prefix.preparatorParams._2) + + val result = getDataSourceResult( + workflow = workflow, + prefix = new DataSourcePrefix(prefix)) + .mapValues { case (td, _, _) => preparator.prepareBase(workflow.sc, td) } + + cache += Tuple2(prefix, result) + } + cache(prefix) + } + + def computeAlgorithmsResult[TD, EI, PD, Q, P, A]( + workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A], + prefix: AlgorithmsPrefix): Map[EX, RDD[(QX, Seq[P])]] = { + + val algoMap: Map[AX, BaseAlgorithm[PD, _, Q, P]] = prefix.algorithmParamsList + .map { case (algoName, algoParams) => { + try { + Doer(workflow.engine.algorithmClassMap(algoName), algoParams) + } catch { + case e: NoSuchElementException => { + val algorithmClassMap = workflow.engine.algorithmClassMap + if (algoName == "") { + logger.error("Empty algorithm name supplied but it could not " + + "match with any algorithm in the engine's definition. " + + "Existing algorithm name(s) are: " + + s"${algorithmClassMap.keys.mkString(", ")}. Aborting.") + } else { + logger.error(s"${algoName} cannot be found in the engine's " + + "definition. Existing algorithm name(s) are: " + + s"${algorithmClassMap.keys.mkString(", ")}. Aborting.") + } + sys.exit(1) + } + } + }} + .zipWithIndex + .map(_.swap) + .toMap + + val algoCount = algoMap.size + + // Model Train + val algoModelsMap: Map[EX, Map[AX, Any]] = getPreparatorResult( + workflow, + new PreparatorPrefix(prefix)) + .mapValues { + pd => algoMap.mapValues(_.trainBase(workflow.sc,pd)) + } + + // Predict + val dataSourceResult = + FastEvalEngineWorkflow.getDataSourceResult( + workflow = workflow, + prefix = new DataSourcePrefix(prefix)) + + val algoResult: Map[EX, RDD[(QX, Seq[P])]] = dataSourceResult + .par + .map { case (ex, (td, ei, iqaRDD)) => { + val modelsMap: Map[AX, Any] = algoModelsMap(ex) + val qs: RDD[(QX, Q)] = iqaRDD.mapValues(_._1) + + val algoPredicts: Seq[RDD[(QX, (AX, P))]] = (0 until algoCount) + .map { ax => { + val algo = algoMap(ax) + val model = modelsMap(ax) + val rawPredicts: RDD[(QX, P)] = algo.batchPredictBase( + workflow.sc, + model, + qs) + + val predicts: RDD[(QX, (AX, P))] = rawPredicts.map { + case (qx, p) => (qx, (ax, p)) + } + predicts + }} + + val unionAlgoPredicts: RDD[(QX, Seq[P])] = workflow.sc + .union(algoPredicts) + .groupByKey + .mapValues { ps => { + assert (ps.size == algoCount, "Must have same length as algoCount") + // TODO. Check size == algoCount + ps.toSeq.sortBy(_._1).map(_._2) + }} + (ex, unionAlgoPredicts) + }} + .seq + .toMap + + algoResult + } + + def getAlgorithmsResult[TD, EI, PD, Q, P, A]( + workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A], + prefix: AlgorithmsPrefix): Map[EX, RDD[(QX, Seq[P])]] = { + val cache = workflow.algorithmsCache + if (!cache.contains(prefix)) { + val result = computeAlgorithmsResult(workflow, prefix) + cache += Tuple2(prefix, result) + } + cache(prefix) + } + + def getServingResult[TD, EI, PD, Q, P, A]( + workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A], + prefix: ServingPrefix) + : Seq[(EI, RDD[(Q, P, A)])] = { + val cache = workflow.servingCache + if (!cache.contains(prefix)) { + val serving = Doer( + workflow.engine.servingClassMap(prefix.servingParams._1), + prefix.servingParams._2) + + val algoPredictsMap = getAlgorithmsResult( + workflow = workflow, + prefix = new AlgorithmsPrefix(prefix)) + + val dataSourceResult = getDataSourceResult( + workflow = workflow, + prefix = new DataSourcePrefix(prefix)) + + val evalQAsMap = dataSourceResult.mapValues(_._3) + val evalInfoMap = dataSourceResult.mapValues(_._2) + + val servingQPAMap: Map[EX, RDD[(Q, P, A)]] = algoPredictsMap + .map { case (ex, psMap) => { + val qasMap: RDD[(QX, (Q, A))] = evalQAsMap(ex) + val qpsaMap: RDD[(QX, Q, Seq[P], A)] = psMap.join(qasMap) + .map { case (qx, t) => (qx, t._2._1, t._1, t._2._2) } + + val qpaMap: RDD[(Q, P, A)] = qpsaMap.map { + case (qx, q, ps, a) => (q, serving.serveBase(q, ps), a) + } + (ex, qpaMap) + }} + + val servingResult = (0 until evalQAsMap.size).map { ex => { + (evalInfoMap(ex), servingQPAMap(ex)) + }} + .toSeq + + cache += Tuple2(prefix, servingResult) + } + cache(prefix) + } + + def get[TD, EI, PD, Q, P, A]( + workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A], + engineParamsList: Seq[EngineParams]) + : Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])] = { + engineParamsList.map { engineParams => { + (engineParams, + getServingResult(workflow, new ServingPrefix(engineParams))) + }} + } +} + +/** :: Experimental :: + * Workflow based on [[FastEvalEngine]] + * + * @group Evaluation + */ +@Experimental +class FastEvalEngineWorkflow[TD, EI, PD, Q, P, A]( + val engine: FastEvalEngine[TD, EI, PD, Q, P, A], + val sc: SparkContext, + val workflowParams: WorkflowParams) extends Serializable { + + import org.apache.predictionio.controller.FastEvalEngineWorkflow._ + + type DataSourceResult = Map[EX, (TD, EI, RDD[(QX, (Q, A))])] + type PreparatorResult = Map[EX, PD] + type AlgorithmsResult = Map[EX, RDD[(QX, Seq[P])]] + type ServingResult = Seq[(EI, RDD[(Q, P, A)])] + + val dataSourceCache = MutableHashMap[DataSourcePrefix, DataSourceResult]() + val preparatorCache = MutableHashMap[PreparatorPrefix, PreparatorResult]() + val algorithmsCache = MutableHashMap[AlgorithmsPrefix, AlgorithmsResult]() + val servingCache = MutableHashMap[ServingPrefix, ServingResult]() +} + + + +/** :: Experimental :: + * FastEvalEngine is a subclass of [[Engine]] that exploits the immutability of + * controllers to optimize the evaluation process + * + * @group Evaluation + */ +@Experimental +class FastEvalEngine[TD, EI, PD, Q, P, A]( + dataSourceClassMap: Map[String, Class[_ <: BaseDataSource[TD, EI, Q, A]]], + preparatorClassMap: Map[String, Class[_ <: BasePreparator[TD, PD]]], + algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]], + servingClassMap: Map[String, Class[_ <: BaseServing[Q, P]]]) + extends Engine[TD, EI, PD, Q, P, A]( + dataSourceClassMap, + preparatorClassMap, + algorithmClassMap, + servingClassMap) { + @transient override lazy val logger = Logger[this.type] + + override def eval( + sc: SparkContext, + engineParams: EngineParams, + params: WorkflowParams): Seq[(EI, RDD[(Q, P, A)])] = { + logger.info("FastEvalEngine.eval") + batchEval(sc, Seq(engineParams), params).head._2 + } + + override def batchEval( + sc: SparkContext, + engineParamsList: Seq[EngineParams], + params: WorkflowParams) + : Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])] = { + + val fastEngineWorkflow = new FastEvalEngineWorkflow( + this, sc, params) + + FastEvalEngineWorkflow.get( + fastEngineWorkflow, + engineParamsList) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala b/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala new file mode 100644 index 0000000..c7669ba --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala @@ -0,0 +1,92 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import org.apache.predictionio.core.BaseDataSource +import org.apache.predictionio.core.BasePreparator +import org.apache.spark.SparkContext + +import scala.reflect._ + +/** A helper concrete implementation of [[org.apache.predictionio.core.BasePreparator]] + * that passes training data through without any special preparation. This can + * be used in place for both [[PPreparator]] and [[LPreparator]]. + * + * @tparam TD Training data class. + * @group Preparator + */ +class IdentityPreparator[TD] extends BasePreparator[TD, TD] { + def prepareBase(sc: SparkContext, td: TD): TD = td +} + +/** Companion object of [[IdentityPreparator]] that conveniently returns an + * instance of the class of [[IdentityPreparator]] for use with + * [[EngineFactory]]. + * + * @group Preparator + */ +object IdentityPreparator { + /** Produces an instance of the class of [[IdentityPreparator]]. + * + * @param ds Instance of the class of the data source for this preparator. + */ + def apply[TD](ds: Class[_ <: BaseDataSource[TD, _, _, _]]): Class[IdentityPreparator[TD]] = + classOf[IdentityPreparator[TD]] +} + +/** DEPRECATED. Use [[IdentityPreparator]] instead. + * + * @tparam TD Training data class. + * @group Preparator + */ +@deprecated("Use IdentityPreparator instead.", "0.9.2") +class PIdentityPreparator[TD] extends IdentityPreparator[TD] + +/** DEPRECATED. Use [[IdentityPreparator]] instead. + * + * @group Preparator + */ +@deprecated("Use IdentityPreparator instead.", "0.9.2") +object PIdentityPreparator { + /** Produces an instance of the class of [[IdentityPreparator]]. + * + * @param ds Instance of the class of the data source for this preparator. + */ + def apply[TD](ds: Class[_ <: BaseDataSource[TD, _, _, _]]): Class[IdentityPreparator[TD]] = + classOf[IdentityPreparator[TD]] +} + +/** DEPRECATED. Use [[IdentityPreparator]] instead. + * + * @tparam TD Training data class. + * @group Preparator + */ +@deprecated("Use IdentityPreparator instead.", "0.9.2") +class LIdentityPreparator[TD] extends IdentityPreparator[TD] + +/** DEPRECATED. Use [[IdentityPreparator]] instead. + * + * @group Preparator + */ +@deprecated("Use IdentityPreparator instead.", "0.9.2") +object LIdentityPreparator { + /** Produces an instance of the class of [[IdentityPreparator]]. + * + * @param ds Instance of the class of the data source for this preparator. + */ + def apply[TD](ds: Class[_ <: BaseDataSource[TD, _, _, _]]): Class[IdentityPreparator[TD]] = + classOf[IdentityPreparator[TD]] +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala new file mode 100644 index 0000000..664ebb7 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala @@ -0,0 +1,130 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import _root_.org.apache.predictionio.annotation.DeveloperApi +import org.apache.predictionio.core.BaseAlgorithm +import org.apache.predictionio.workflow.PersistentModelManifest +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD + +import scala.reflect._ + +/** Base class of a local algorithm. + * + * A local algorithm runs locally within a single machine and produces a model + * that can fit within a single machine. + * + * If your input query class requires custom JSON4S serialization, the most + * idiomatic way is to implement a trait that extends [[CustomQuerySerializer]], + * and mix that into your algorithm class, instead of overriding + * [[querySerializer]] directly. + * + * @tparam PD Prepared data class. + * @tparam M Trained model class. + * @tparam Q Input query class. + * @tparam P Output prediction class. + * @group Algorithm + */ +abstract class LAlgorithm[PD, M : ClassTag, Q, P] + extends BaseAlgorithm[RDD[PD], RDD[M], Q, P] { + + def trainBase(sc: SparkContext, pd: RDD[PD]): RDD[M] = pd.map(train) + + /** Implement this method to produce a model from prepared data. + * + * @param pd Prepared data for model training. + * @return Trained model. + */ + def train(pd: PD): M + + def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)]) + : RDD[(Long, P)] = { + val mRDD = bm.asInstanceOf[RDD[M]] + batchPredict(mRDD, qs) + } + + /** This is a default implementation to perform batch prediction. Override + * this method for a custom implementation. + * + * @param mRDD A single model wrapped inside an RDD + * @param qs An RDD of index-query tuples. The index is used to keep track of + * predicted results with corresponding queries. + * @return Batch of predicted results + */ + def batchPredict(mRDD: RDD[M], qs: RDD[(Long, Q)]): RDD[(Long, P)] = { + val glomQs: RDD[Array[(Long, Q)]] = qs.glom() + val cartesian: RDD[(M, Array[(Long, Q)])] = mRDD.cartesian(glomQs) + cartesian.flatMap { case (m, qArray) => + qArray.map { case (qx, q) => (qx, predict(m, q)) } + } + } + + def predictBase(localBaseModel: Any, q: Q): P = { + predict(localBaseModel.asInstanceOf[M], q) + } + + /** Implement this method to produce a prediction from a query and trained + * model. + * + * @param m Trained model produced by [[train]]. + * @param q An input query. + * @return A prediction. + */ + def predict(m: M, q: Q): P + + /** :: DeveloperApi :: + * Engine developers should not use this directly (read on to see how local + * algorithm models are persisted). + * + * Local algorithms produce local models. By default, models will be + * serialized and stored automatically. Engine developers can override this behavior by + * mixing the [[PersistentModel]] trait into the model class, and + * PredictionIO will call [[PersistentModel.save]] instead. If it returns + * true, a [[org.apache.predictionio.workflow.PersistentModelManifest]] will be + * returned so that during deployment, PredictionIO will use + * [[PersistentModelLoader]] to retrieve the model. Otherwise, Unit will be + * returned and the model will be re-trained on-the-fly. + * + * @param sc Spark context + * @param modelId Model ID + * @param algoParams Algorithm parameters that trained this model + * @param bm Model + * @return The model itself for automatic persistence, an instance of + * [[org.apache.predictionio.workflow.PersistentModelManifest]] for manual + * persistence, or Unit for re-training on deployment + */ + @DeveloperApi + override + def makePersistentModel( + sc: SparkContext, + modelId: String, + algoParams: Params, + bm: Any): Any = { + // Check RDD[M].count == 1 + val m = bm.asInstanceOf[RDD[M]].first() + if (m.isInstanceOf[PersistentModel[_]]) { + if (m.asInstanceOf[PersistentModel[Params]].save( + modelId, algoParams, sc)) { + PersistentModelManifest(className = m.getClass.getName) + } else { + Unit + } + } else { + m + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LAverageServing.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/LAverageServing.scala b/core/src/main/scala/org/apache/predictionio/controller/LAverageServing.scala new file mode 100644 index 0000000..7fbe7ac --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/LAverageServing.scala @@ -0,0 +1,41 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import org.apache.predictionio.core.BaseAlgorithm + +/** A concrete implementation of [[LServing]] returning the average of all + * algorithms' predictions, where their classes are expected to be all Double. + * + * @group Serving + */ +class LAverageServing[Q] extends LServing[Q, Double] { + /** Returns the average of all algorithms' predictions. */ + def serve(query: Q, predictions: Seq[Double]): Double = { + predictions.sum / predictions.length + } +} + +/** A concrete implementation of [[LServing]] returning the average of all + * algorithms' predictions, where their classes are expected to be all Double. + * + * @group Serving + */ +object LAverageServing { + /** Returns an instance of [[LAverageServing]]. */ + def apply[Q](a: Class[_ <: BaseAlgorithm[_, _, Q, _]]): Class[LAverageServing[Q]] = + classOf[LAverageServing[Q]] +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LDataSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/LDataSource.scala b/core/src/main/scala/org/apache/predictionio/controller/LDataSource.scala new file mode 100644 index 0000000..adb8e20 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/LDataSource.scala @@ -0,0 +1,67 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import org.apache.predictionio.core.BaseDataSource +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD + +import scala.reflect._ + +/** Base class of a local data source. + * + * A local data source runs locally within a single machine and return data + * that can fit within a single machine. + * + * @tparam TD Training data class. + * @tparam EI Evaluation Info class. + * @tparam Q Input query class. + * @tparam A Actual value class. + * @group Data Source + */ +abstract class LDataSource[TD: ClassTag, EI, Q, A] + extends BaseDataSource[RDD[TD], EI, Q, A] { + + def readTrainingBase(sc: SparkContext): RDD[TD] = { + sc.parallelize(Seq(None)).map(_ => readTraining()) + } + + /** Implement this method to only return training data from a data source */ + def readTraining(): TD + + def readEvalBase(sc: SparkContext): Seq[(RDD[TD], EI, RDD[(Q, A)])] = { + val localEvalData: Seq[(TD, EI, Seq[(Q, A)])] = readEval() + + localEvalData.map { case (td, ei, qaSeq) => { + val tdRDD = sc.parallelize(Seq(None)).map(_ => td) + val qaRDD = sc.parallelize(qaSeq) + (tdRDD, ei, qaRDD) + }} + } + + /** To provide evaluation feature for your engine, your must override this + * method to return data for evaluation from a data source. Returned data can + * optionally include a sequence of query and actual value pairs for + * evaluation purpose. + * + * The default implementation returns an empty sequence as a stub, so that + * an engine can be compiled without implementing evaluation. + */ + def readEval(): Seq[(TD, EI, Seq[(Q, A)])] = Seq[(TD, EI, Seq[(Q, A)])]() + + @deprecated("Use readEval() instead.", "0.9.0") + def read(): Seq[(TD, EI, Seq[(Q, A)])] = readEval() +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LFirstServing.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/LFirstServing.scala b/core/src/main/scala/org/apache/predictionio/controller/LFirstServing.scala new file mode 100644 index 0000000..e677743 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/LFirstServing.scala @@ -0,0 +1,39 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import org.apache.predictionio.core.BaseAlgorithm + +/** A concrete implementation of [[LServing]] returning the first algorithm's + * prediction result directly without any modification. + * + * @group Serving + */ +class LFirstServing[Q, P] extends LServing[Q, P] { + /** Returns the first algorithm's prediction. */ + def serve(query: Q, predictions: Seq[P]): P = predictions.head +} + +/** A concrete implementation of [[LServing]] returning the first algorithm's + * prediction result directly without any modification. + * + * @group Serving + */ +object LFirstServing { + /** Returns an instance of [[LFirstServing]]. */ + def apply[Q, P](a: Class[_ <: BaseAlgorithm[_, _, Q, P]]): Class[LFirstServing[Q, P]] = + classOf[LFirstServing[Q, P]] +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LPreparator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/LPreparator.scala b/core/src/main/scala/org/apache/predictionio/controller/LPreparator.scala new file mode 100644 index 0000000..32ffd5d --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/LPreparator.scala @@ -0,0 +1,46 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import org.apache.predictionio.core.BasePreparator +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD + +import scala.reflect._ + +/** Base class of a local preparator. + * + * A local preparator runs locally within a single machine and produces + * prepared data that can fit within a single machine. + * + * @tparam TD Training data class. + * @tparam PD Prepared data class. + * @group Preparator + */ +abstract class LPreparator[TD, PD : ClassTag] + extends BasePreparator[RDD[TD], RDD[PD]] { + + def prepareBase(sc: SparkContext, rddTd: RDD[TD]): RDD[PD] = { + rddTd.map(prepare) + } + + /** Implement this method to produce prepared data that is ready for model + * training. + * + * @param trainingData Training data to be prepared. + */ + def prepare(trainingData: TD): PD +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LServing.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/LServing.scala b/core/src/main/scala/org/apache/predictionio/controller/LServing.scala new file mode 100644 index 0000000..653b998 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/LServing.scala @@ -0,0 +1,52 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import org.apache.predictionio.annotation.Experimental +import org.apache.predictionio.core.BaseServing + +/** Base class of serving. + * + * @tparam Q Input query class. + * @tparam P Output prediction class. + * @group Serving + */ +abstract class LServing[Q, P] extends BaseServing[Q, P] { + def supplementBase(q: Q): Q = supplement(q) + + /** :: Experimental :: + * Implement this method to supplement the query before sending it to + * algorithms. + * + * @param q Query + * @return A supplemented Query + */ + @Experimental + def supplement(q: Q): Q = q + + def serveBase(q: Q, ps: Seq[P]): P = { + serve(q, ps) + } + + /** Implement this method to combine multiple algorithms' predictions to + * produce a single final prediction. The query is the original query sent to + * the engine, not the supplemented produced by [[LServing.supplement]]. + * + * @param query Original input query. + * @param predictions A list of algorithms' predictions. + */ + def serve(query: Q, predictions: Seq[P]): P +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/LocalFileSystemPersistentModel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/LocalFileSystemPersistentModel.scala b/core/src/main/scala/org/apache/predictionio/controller/LocalFileSystemPersistentModel.scala new file mode 100644 index 0000000..f90e28d --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/LocalFileSystemPersistentModel.scala @@ -0,0 +1,74 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import org.apache.spark.SparkContext + +/** This trait is a convenience helper for persisting your model to the local + * filesystem. This trait and [[LocalFileSystemPersistentModelLoader]] contain + * concrete implementation and need not be implemented. + * + * The underlying implementation is [[Utils.save]]. + * + * {{{ + * class MyModel extends LocalFileSystemPersistentModel[MyParams] { + * ... + * } + * + * object MyModel extends LocalFileSystemPersistentModelLoader[MyParams, MyModel] { + * ... + * } + * }}} + * + * @tparam AP Algorithm parameters class. + * @see [[LocalFileSystemPersistentModelLoader]] + * @group Algorithm + */ +trait LocalFileSystemPersistentModel[AP <: Params] extends PersistentModel[AP] { + def save(id: String, params: AP, sc: SparkContext): Boolean = { + Utils.save(id, this) + true + } +} + +/** Implement an object that extends this trait for PredictionIO to support + * loading a persisted model from local filesystem during serving deployment. + * + * The underlying implementation is [[Utils.load]]. + * + * @tparam AP Algorithm parameters class. + * @tparam M Model class. + * @see [[LocalFileSystemPersistentModel]] + * @group Algorithm + */ +trait LocalFileSystemPersistentModelLoader[AP <: Params, M] + extends PersistentModelLoader[AP, M] { + def apply(id: String, params: AP, sc: Option[SparkContext]): M = { + Utils.load(id).asInstanceOf[M] + } +} + +/** DEPRECATED. Use [[LocalFileSystemPersistentModel]] instead. + * + * @group Algorithm */ +@deprecated("Use LocalFileSystemPersistentModel instead.", "0.9.2") +trait IFSPersistentModel[AP <: Params] extends LocalFileSystemPersistentModel[AP] + +/** DEPRECATED. Use [[LocalFileSystemPersistentModelLoader]] instead. + * + * @group Algorithm */ +@deprecated("Use LocalFileSystemPersistentModelLoader instead.", "0.9.2") +trait IFSPersistentModelLoader[AP <: Params, M] extends LocalFileSystemPersistentModelLoader[AP, M]
