http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/EngineParams.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/EngineParams.scala b/core/src/main/scala/io/prediction/controller/EngineParams.scala deleted file mode 100644 index 32f5de7..0000000 --- a/core/src/main/scala/io/prediction/controller/EngineParams.scala +++ /dev/null @@ -1,149 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import io.prediction.core.BaseDataSource -import io.prediction.core.BaseAlgorithm - -import scala.collection.JavaConversions -import scala.language.implicitConversions - -/** This class serves as a logical grouping of all required engine's parameters. - * - * @param dataSourceParams Data Source name-parameters tuple. - * @param preparatorParams Preparator name-parameters tuple. - * @param algorithmParamsList List of algorithm name-parameter pairs. - * @param servingParams Serving name-parameters tuple. - * @group Engine - */ -class EngineParams( - val dataSourceParams: (String, Params) = ("", EmptyParams()), - val preparatorParams: (String, Params) = ("", EmptyParams()), - val algorithmParamsList: Seq[(String, Params)] = Seq(), - val servingParams: (String, Params) = ("", EmptyParams())) - extends Serializable { - - /** Java-friendly constructor - * - * @param dataSourceName Data Source name - * @param dataSourceParams Data Source parameters - * @param preparatorName Preparator name - * @param preparatorParams Preparator parameters - * @param algorithmParamsList Map of algorithm name-parameters - * @param servingName Serving name - * @param servingParams Serving parameters - */ - def this( - dataSourceName: String, - dataSourceParams: Params, - preparatorName: String, - preparatorParams: Params, - algorithmParamsList: _root_.java.util.Map[String, _ <: Params], - servingName: String, - servingParams: Params) = { - - // To work around a json4s weird limitation, the parameter names can not be changed - this( - (dataSourceName, dataSourceParams), - (preparatorName, preparatorParams), - JavaConversions.mapAsScalaMap(algorithmParamsList).toSeq, - (servingName, servingParams) - ) - } - - // A case class style copy method. - def copy( - dataSourceParams: (String, Params) = dataSourceParams, - preparatorParams: (String, Params) = preparatorParams, - algorithmParamsList: Seq[(String, Params)] = algorithmParamsList, - servingParams: (String, Params) = servingParams): EngineParams = { - - new EngineParams( - dataSourceParams, - preparatorParams, - algorithmParamsList, - servingParams) - } -} - -/** Companion object for creating [[EngineParams]] instances. - * - * @group Engine - */ -object EngineParams { - /** Create EngineParams. - * - * @param dataSourceName Data Source name - * @param dataSourceParams Data Source parameters - * @param preparatorName Preparator name - * @param preparatorParams Preparator parameters - * @param algorithmParamsList List of algorithm name-parameter pairs. - * @param servingName Serving name - * @param servingParams Serving parameters - */ - def apply( - dataSourceName: String = "", - dataSourceParams: Params = EmptyParams(), - preparatorName: String = "", - preparatorParams: Params = EmptyParams(), - algorithmParamsList: Seq[(String, Params)] = Seq(), - servingName: String = "", - servingParams: Params = EmptyParams()): EngineParams = { - new EngineParams( - dataSourceParams = (dataSourceName, dataSourceParams), - preparatorParams = (preparatorName, preparatorParams), - algorithmParamsList = algorithmParamsList, - servingParams = (servingName, servingParams) - ) - } -} - -/** SimpleEngine has only one algorithm, and uses default preparator and serving - * layer. Current default preparator is `IdentityPreparator` and serving is - * `FirstServing`. - * - * @tparam TD Training data class. - * @tparam EI Evaluation info class. - * @tparam Q Input query class. - * @tparam P Output prediction class. - * @tparam A Actual value class. - * @param dataSourceClass Data source class. - * @param algorithmClass of algorithm names to classes. - * @group Engine - */ -class SimpleEngine[TD, EI, Q, P, A]( - dataSourceClass: Class[_ <: BaseDataSource[TD, EI, Q, A]], - algorithmClass: Class[_ <: BaseAlgorithm[TD, _, Q, P]]) - extends Engine( - dataSourceClass, - IdentityPreparator(dataSourceClass), - Map("" -> algorithmClass), - LFirstServing(algorithmClass)) - -/** This shorthand class serves the `SimpleEngine` class. - * - * @param dataSourceParams Data source parameters. - * @param algorithmParams List of algorithm name-parameter pairs. - * @group Engine - */ -class SimpleEngineParams( - dataSourceParams: Params = EmptyParams(), - algorithmParams: Params = EmptyParams()) - extends EngineParams( - dataSourceParams = ("", dataSourceParams), - algorithmParamsList = Seq(("", algorithmParams))) - -
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/EngineParamsGenerator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/EngineParamsGenerator.scala b/core/src/main/scala/io/prediction/controller/EngineParamsGenerator.scala deleted file mode 100644 index a9bf3eb..0000000 --- a/core/src/main/scala/io/prediction/controller/EngineParamsGenerator.scala +++ /dev/null @@ -1,43 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import scala.language.implicitConversions - -/** Defines an engine parameters generator. - * - * Implementations of this trait can be supplied to "pio eval" as the second - * command line argument. - * - * @group Evaluation - */ -trait EngineParamsGenerator { - protected[this] var epList: Seq[EngineParams] = _ - protected[this] var epListSet: Boolean = false - - /** Returns the list of [[EngineParams]] of this [[EngineParamsGenerator]]. */ - def engineParamsList: Seq[EngineParams] = { - assert(epListSet, "EngineParamsList not set") - epList - } - - /** Sets the list of [[EngineParams]] of this [[EngineParamsGenerator]]. */ - def engineParamsList_=(l: Seq[EngineParams]) { - assert(!epListSet, "EngineParamsList can bet set at most once") - epList = Seq(l:_*) - epListSet = true - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/Evaluation.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/Evaluation.scala b/core/src/main/scala/io/prediction/controller/Evaluation.scala deleted file mode 100644 index a6ee9a7..0000000 --- a/core/src/main/scala/io/prediction/controller/Evaluation.scala +++ /dev/null @@ -1,122 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import io.prediction.core.BaseEngine -import io.prediction.core.BaseEvaluator -import io.prediction.core.BaseEvaluatorResult - -import scala.language.implicitConversions - -/** Defines an evaluation that contains an engine and a metric. - * - * Implementations of this trait can be supplied to "pio eval" as the first - * argument. - * - * @group Evaluation - */ -trait Evaluation extends Deployment { - protected [this] var _evaluatorSet: Boolean = false - protected [this] var _evaluator: BaseEvaluator[_, _, _, _, _ <: BaseEvaluatorResult] = _ - - private [prediction] - def evaluator: BaseEvaluator[_, _, _, _, _ <: BaseEvaluatorResult] = { - assert(_evaluatorSet, "Evaluator not set") - _evaluator - } - - /** Gets the tuple of the [[Engine]] and the implementation of - * [[io.prediction.core.BaseEvaluator]] - */ - def engineEvaluator - : (BaseEngine[_, _, _, _], BaseEvaluator[_, _, _, _, _]) = { - assert(_evaluatorSet, "Evaluator not set") - (engine, _evaluator) - } - - /** Sets both an [[Engine]] and an implementation of - * [[io.prediction.core.BaseEvaluator]] for this [[Evaluation]] - * - * @param engineEvaluator A tuple an [[Engine]] and an implementation of - * [[io.prediction.core.BaseEvaluator]] - * @tparam EI Evaluation information class - * @tparam Q Query class - * @tparam P Predicted result class - * @tparam A Actual result class - * @tparam R Metric result class - */ - def engineEvaluator_=[EI, Q, P, A, R <: BaseEvaluatorResult]( - engineEvaluator: ( - BaseEngine[EI, Q, P, A], - BaseEvaluator[EI, Q, P, A, R])) { - assert(!_evaluatorSet, "Evaluator can be set at most once") - engine = engineEvaluator._1 - _evaluator = engineEvaluator._2 - _evaluatorSet = true - } - - /** Returns both the [[Engine]] and the implementation of [[Metric]] for this - * [[Evaluation]] - */ - def engineMetric: (BaseEngine[_, _, _, _], Metric[_, _, _, _, _]) = { - throw new NotImplementedError("This method is to keep the compiler happy") - } - - /** Sets both an [[Engine]] and an implementation of [[Metric]] for this - * [[Evaluation]] - * - * @param engineMetric A tuple of [[Engine]] and an implementation of - * [[Metric]] - * @tparam EI Evaluation information class - * @tparam Q Query class - * @tparam P Predicted result class - * @tparam A Actual result class - */ - def engineMetric_=[EI, Q, P, A]( - engineMetric: (BaseEngine[EI, Q, P, A], Metric[EI, Q, P, A, _])) { - engineEvaluator = ( - engineMetric._1, - MetricEvaluator( - metric = engineMetric._2, - otherMetrics = Seq[Metric[EI, Q, P, A, _]](), - outputPath = "best.json")) - } - - private [prediction] - def engineMetrics: (BaseEngine[_, _, _, _], Metric[_, _, _, _, _]) = { - throw new NotImplementedError("This method is to keep the compiler happy") - } - - /** Sets an [[Engine]], an implementation of [[Metric]], and sequence of - * implementations of [[Metric]] for this [[Evaluation]] - * - * @param engineMetrics A tuple of [[Engine]], an implementation of - * [[Metric]] and sequence of implementations of [[Metric]] - * @tparam EI Evaluation information class - * @tparam Q Query class - * @tparam P Predicted result class - * @tparam A Actual result class - */ - def engineMetrics_=[EI, Q, P, A]( - engineMetrics: ( - BaseEngine[EI, Q, P, A], - Metric[EI, Q, P, A, _], - Seq[Metric[EI, Q, P, A, _]])) { - engineEvaluator = ( - engineMetrics._1, - MetricEvaluator(engineMetrics._2, engineMetrics._3)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/FastEvalEngine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/FastEvalEngine.scala b/core/src/main/scala/io/prediction/controller/FastEvalEngine.scala deleted file mode 100644 index 8e9727e..0000000 --- a/core/src/main/scala/io/prediction/controller/FastEvalEngine.scala +++ /dev/null @@ -1,343 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import io.prediction.core.BaseDataSource -import io.prediction.core.BasePreparator -import io.prediction.core.BaseAlgorithm -import io.prediction.core.BaseServing -import io.prediction.core.Doer -import io.prediction.annotation.Experimental - -import grizzled.slf4j.Logger -import io.prediction.workflow.WorkflowParams -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -import scala.language.implicitConversions - -import _root_.java.util.NoSuchElementException - -import scala.collection.mutable.{ HashMap => MutableHashMap } - -/** :: Experimental :: - * Workflow based on [[FastEvalEngine]] - * - * @group Evaluation - */ -@Experimental -object FastEvalEngineWorkflow { - @transient lazy val logger = Logger[this.type] - - type EX = Int - type AX = Int - type QX = Long - - case class DataSourcePrefix(dataSourceParams: (String, Params)) { - def this(pp: PreparatorPrefix) = this(pp.dataSourceParams) - def this(ap: AlgorithmsPrefix) = this(ap.dataSourceParams) - def this(sp: ServingPrefix) = this(sp.dataSourceParams) - } - - case class PreparatorPrefix( - dataSourceParams: (String, Params), - preparatorParams: (String, Params)) { - def this(ap: AlgorithmsPrefix) = { - this(ap.dataSourceParams, ap.preparatorParams) - } - } - - case class AlgorithmsPrefix( - dataSourceParams: (String, Params), - preparatorParams: (String, Params), - algorithmParamsList: Seq[(String, Params)]) { - def this(sp: ServingPrefix) = { - this(sp.dataSourceParams, sp.preparatorParams, sp.algorithmParamsList) - } - } - - case class ServingPrefix( - dataSourceParams: (String, Params), - preparatorParams: (String, Params), - algorithmParamsList: Seq[(String, Params)], - servingParams: (String, Params)) { - def this(ep: EngineParams) = this( - ep.dataSourceParams, - ep.preparatorParams, - ep.algorithmParamsList, - ep.servingParams) - } - - def getDataSourceResult[TD, EI, PD, Q, P, A]( - workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A], - prefix: DataSourcePrefix) - : Map[EX, (TD, EI, RDD[(QX, (Q, A))])] = { - val cache = workflow.dataSourceCache - - if (!cache.contains(prefix)) { - val dataSource = Doer( - workflow.engine.dataSourceClassMap(prefix.dataSourceParams._1), - prefix.dataSourceParams._2) - - val result = dataSource - .readEvalBase(workflow.sc) - .map { case (td, ei, qaRDD) => { - (td, ei, qaRDD.zipWithUniqueId().map(_.swap)) - }} - .zipWithIndex - .map(_.swap) - .toMap - - cache += Tuple2(prefix, result) - } - cache(prefix) - } - - def getPreparatorResult[TD, EI, PD, Q, P, A]( - workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A], - prefix: PreparatorPrefix): Map[EX, PD] = { - val cache = workflow.preparatorCache - - if (!cache.contains(prefix)) { - val preparator = Doer( - workflow.engine.preparatorClassMap(prefix.preparatorParams._1), - prefix.preparatorParams._2) - - val result = getDataSourceResult( - workflow = workflow, - prefix = new DataSourcePrefix(prefix)) - .mapValues { case (td, _, _) => preparator.prepareBase(workflow.sc, td) } - - cache += Tuple2(prefix, result) - } - cache(prefix) - } - - def computeAlgorithmsResult[TD, EI, PD, Q, P, A]( - workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A], - prefix: AlgorithmsPrefix): Map[EX, RDD[(QX, Seq[P])]] = { - - val algoMap: Map[AX, BaseAlgorithm[PD, _, Q, P]] = prefix.algorithmParamsList - .map { case (algoName, algoParams) => { - try { - Doer(workflow.engine.algorithmClassMap(algoName), algoParams) - } catch { - case e: NoSuchElementException => { - val algorithmClassMap = workflow.engine.algorithmClassMap - if (algoName == "") { - logger.error("Empty algorithm name supplied but it could not " + - "match with any algorithm in the engine's definition. " + - "Existing algorithm name(s) are: " + - s"${algorithmClassMap.keys.mkString(", ")}. Aborting.") - } else { - logger.error(s"${algoName} cannot be found in the engine's " + - "definition. Existing algorithm name(s) are: " + - s"${algorithmClassMap.keys.mkString(", ")}. Aborting.") - } - sys.exit(1) - } - } - }} - .zipWithIndex - .map(_.swap) - .toMap - - val algoCount = algoMap.size - - // Model Train - val algoModelsMap: Map[EX, Map[AX, Any]] = getPreparatorResult( - workflow, - new PreparatorPrefix(prefix)) - .mapValues { - pd => algoMap.mapValues(_.trainBase(workflow.sc,pd)) - } - - // Predict - val dataSourceResult = - FastEvalEngineWorkflow.getDataSourceResult( - workflow = workflow, - prefix = new DataSourcePrefix(prefix)) - - val algoResult: Map[EX, RDD[(QX, Seq[P])]] = dataSourceResult - .par - .map { case (ex, (td, ei, iqaRDD)) => { - val modelsMap: Map[AX, Any] = algoModelsMap(ex) - val qs: RDD[(QX, Q)] = iqaRDD.mapValues(_._1) - - val algoPredicts: Seq[RDD[(QX, (AX, P))]] = (0 until algoCount) - .map { ax => { - val algo = algoMap(ax) - val model = modelsMap(ax) - val rawPredicts: RDD[(QX, P)] = algo.batchPredictBase( - workflow.sc, - model, - qs) - - val predicts: RDD[(QX, (AX, P))] = rawPredicts.map { - case (qx, p) => (qx, (ax, p)) - } - predicts - }} - - val unionAlgoPredicts: RDD[(QX, Seq[P])] = workflow.sc - .union(algoPredicts) - .groupByKey - .mapValues { ps => { - assert (ps.size == algoCount, "Must have same length as algoCount") - // TODO. Check size == algoCount - ps.toSeq.sortBy(_._1).map(_._2) - }} - (ex, unionAlgoPredicts) - }} - .seq - .toMap - - algoResult - } - - def getAlgorithmsResult[TD, EI, PD, Q, P, A]( - workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A], - prefix: AlgorithmsPrefix): Map[EX, RDD[(QX, Seq[P])]] = { - val cache = workflow.algorithmsCache - if (!cache.contains(prefix)) { - val result = computeAlgorithmsResult(workflow, prefix) - cache += Tuple2(prefix, result) - } - cache(prefix) - } - - def getServingResult[TD, EI, PD, Q, P, A]( - workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A], - prefix: ServingPrefix) - : Seq[(EI, RDD[(Q, P, A)])] = { - val cache = workflow.servingCache - if (!cache.contains(prefix)) { - val serving = Doer( - workflow.engine.servingClassMap(prefix.servingParams._1), - prefix.servingParams._2) - - val algoPredictsMap = getAlgorithmsResult( - workflow = workflow, - prefix = new AlgorithmsPrefix(prefix)) - - val dataSourceResult = getDataSourceResult( - workflow = workflow, - prefix = new DataSourcePrefix(prefix)) - - val evalQAsMap = dataSourceResult.mapValues(_._3) - val evalInfoMap = dataSourceResult.mapValues(_._2) - - val servingQPAMap: Map[EX, RDD[(Q, P, A)]] = algoPredictsMap - .map { case (ex, psMap) => { - val qasMap: RDD[(QX, (Q, A))] = evalQAsMap(ex) - val qpsaMap: RDD[(QX, Q, Seq[P], A)] = psMap.join(qasMap) - .map { case (qx, t) => (qx, t._2._1, t._1, t._2._2) } - - val qpaMap: RDD[(Q, P, A)] = qpsaMap.map { - case (qx, q, ps, a) => (q, serving.serveBase(q, ps), a) - } - (ex, qpaMap) - }} - - val servingResult = (0 until evalQAsMap.size).map { ex => { - (evalInfoMap(ex), servingQPAMap(ex)) - }} - .toSeq - - cache += Tuple2(prefix, servingResult) - } - cache(prefix) - } - - def get[TD, EI, PD, Q, P, A]( - workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A], - engineParamsList: Seq[EngineParams]) - : Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])] = { - engineParamsList.map { engineParams => { - (engineParams, - getServingResult(workflow, new ServingPrefix(engineParams))) - }} - } -} - -/** :: Experimental :: - * Workflow based on [[FastEvalEngine]] - * - * @group Evaluation - */ -@Experimental -class FastEvalEngineWorkflow[TD, EI, PD, Q, P, A]( - val engine: FastEvalEngine[TD, EI, PD, Q, P, A], - val sc: SparkContext, - val workflowParams: WorkflowParams) extends Serializable { - - import io.prediction.controller.FastEvalEngineWorkflow._ - - type DataSourceResult = Map[EX, (TD, EI, RDD[(QX, (Q, A))])] - type PreparatorResult = Map[EX, PD] - type AlgorithmsResult = Map[EX, RDD[(QX, Seq[P])]] - type ServingResult = Seq[(EI, RDD[(Q, P, A)])] - - val dataSourceCache = MutableHashMap[DataSourcePrefix, DataSourceResult]() - val preparatorCache = MutableHashMap[PreparatorPrefix, PreparatorResult]() - val algorithmsCache = MutableHashMap[AlgorithmsPrefix, AlgorithmsResult]() - val servingCache = MutableHashMap[ServingPrefix, ServingResult]() -} - - - -/** :: Experimental :: - * FastEvalEngine is a subclass of [[Engine]] that exploits the immutability of - * controllers to optimize the evaluation process - * - * @group Evaluation - */ -@Experimental -class FastEvalEngine[TD, EI, PD, Q, P, A]( - dataSourceClassMap: Map[String, Class[_ <: BaseDataSource[TD, EI, Q, A]]], - preparatorClassMap: Map[String, Class[_ <: BasePreparator[TD, PD]]], - algorithmClassMap: Map[String, Class[_ <: BaseAlgorithm[PD, _, Q, P]]], - servingClassMap: Map[String, Class[_ <: BaseServing[Q, P]]]) - extends Engine[TD, EI, PD, Q, P, A]( - dataSourceClassMap, - preparatorClassMap, - algorithmClassMap, - servingClassMap) { - @transient override lazy val logger = Logger[this.type] - - override def eval( - sc: SparkContext, - engineParams: EngineParams, - params: WorkflowParams): Seq[(EI, RDD[(Q, P, A)])] = { - logger.info("FastEvalEngine.eval") - batchEval(sc, Seq(engineParams), params).head._2 - } - - override def batchEval( - sc: SparkContext, - engineParamsList: Seq[EngineParams], - params: WorkflowParams) - : Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])] = { - - val fastEngineWorkflow = new FastEvalEngineWorkflow( - this, sc, params) - - FastEvalEngineWorkflow.get( - fastEngineWorkflow, - engineParamsList) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/IdentityPreparator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/IdentityPreparator.scala b/core/src/main/scala/io/prediction/controller/IdentityPreparator.scala deleted file mode 100644 index 0bf3cb0..0000000 --- a/core/src/main/scala/io/prediction/controller/IdentityPreparator.scala +++ /dev/null @@ -1,92 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import io.prediction.core.BaseDataSource -import io.prediction.core.BasePreparator -import org.apache.spark.SparkContext - -import scala.reflect._ - -/** A helper concrete implementation of [[io.prediction.core.BasePreparator]] - * that passes training data through without any special preparation. This can - * be used in place for both [[PPreparator]] and [[LPreparator]]. - * - * @tparam TD Training data class. - * @group Preparator - */ -class IdentityPreparator[TD] extends BasePreparator[TD, TD] { - def prepareBase(sc: SparkContext, td: TD): TD = td -} - -/** Companion object of [[IdentityPreparator]] that conveniently returns an - * instance of the class of [[IdentityPreparator]] for use with - * [[EngineFactory]]. - * - * @group Preparator - */ -object IdentityPreparator { - /** Produces an instance of the class of [[IdentityPreparator]]. - * - * @param ds Instance of the class of the data source for this preparator. - */ - def apply[TD](ds: Class[_ <: BaseDataSource[TD, _, _, _]]): Class[IdentityPreparator[TD]] = - classOf[IdentityPreparator[TD]] -} - -/** DEPRECATED. Use [[IdentityPreparator]] instead. - * - * @tparam TD Training data class. - * @group Preparator - */ -@deprecated("Use IdentityPreparator instead.", "0.9.2") -class PIdentityPreparator[TD] extends IdentityPreparator[TD] - -/** DEPRECATED. Use [[IdentityPreparator]] instead. - * - * @group Preparator - */ -@deprecated("Use IdentityPreparator instead.", "0.9.2") -object PIdentityPreparator { - /** Produces an instance of the class of [[IdentityPreparator]]. - * - * @param ds Instance of the class of the data source for this preparator. - */ - def apply[TD](ds: Class[_ <: BaseDataSource[TD, _, _, _]]): Class[IdentityPreparator[TD]] = - classOf[IdentityPreparator[TD]] -} - -/** DEPRECATED. Use [[IdentityPreparator]] instead. - * - * @tparam TD Training data class. - * @group Preparator - */ -@deprecated("Use IdentityPreparator instead.", "0.9.2") -class LIdentityPreparator[TD] extends IdentityPreparator[TD] - -/** DEPRECATED. Use [[IdentityPreparator]] instead. - * - * @group Preparator - */ -@deprecated("Use IdentityPreparator instead.", "0.9.2") -object LIdentityPreparator { - /** Produces an instance of the class of [[IdentityPreparator]]. - * - * @param ds Instance of the class of the data source for this preparator. - */ - def apply[TD](ds: Class[_ <: BaseDataSource[TD, _, _, _]]): Class[IdentityPreparator[TD]] = - classOf[IdentityPreparator[TD]] -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/LAlgorithm.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/LAlgorithm.scala b/core/src/main/scala/io/prediction/controller/LAlgorithm.scala deleted file mode 100644 index 467a4a0..0000000 --- a/core/src/main/scala/io/prediction/controller/LAlgorithm.scala +++ /dev/null @@ -1,130 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import _root_.io.prediction.annotation.DeveloperApi -import io.prediction.core.BaseAlgorithm -import io.prediction.workflow.PersistentModelManifest -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD - -import scala.reflect._ - -/** Base class of a local algorithm. - * - * A local algorithm runs locally within a single machine and produces a model - * that can fit within a single machine. - * - * If your input query class requires custom JSON4S serialization, the most - * idiomatic way is to implement a trait that extends [[CustomQuerySerializer]], - * and mix that into your algorithm class, instead of overriding - * [[querySerializer]] directly. - * - * @tparam PD Prepared data class. - * @tparam M Trained model class. - * @tparam Q Input query class. - * @tparam P Output prediction class. - * @group Algorithm - */ -abstract class LAlgorithm[PD, M : ClassTag, Q, P] - extends BaseAlgorithm[RDD[PD], RDD[M], Q, P] { - - def trainBase(sc: SparkContext, pd: RDD[PD]): RDD[M] = pd.map(train) - - /** Implement this method to produce a model from prepared data. - * - * @param pd Prepared data for model training. - * @return Trained model. - */ - def train(pd: PD): M - - def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)]) - : RDD[(Long, P)] = { - val mRDD = bm.asInstanceOf[RDD[M]] - batchPredict(mRDD, qs) - } - - /** This is a default implementation to perform batch prediction. Override - * this method for a custom implementation. - * - * @param mRDD A single model wrapped inside an RDD - * @param qs An RDD of index-query tuples. The index is used to keep track of - * predicted results with corresponding queries. - * @return Batch of predicted results - */ - def batchPredict(mRDD: RDD[M], qs: RDD[(Long, Q)]): RDD[(Long, P)] = { - val glomQs: RDD[Array[(Long, Q)]] = qs.glom() - val cartesian: RDD[(M, Array[(Long, Q)])] = mRDD.cartesian(glomQs) - cartesian.flatMap { case (m, qArray) => - qArray.map { case (qx, q) => (qx, predict(m, q)) } - } - } - - def predictBase(localBaseModel: Any, q: Q): P = { - predict(localBaseModel.asInstanceOf[M], q) - } - - /** Implement this method to produce a prediction from a query and trained - * model. - * - * @param m Trained model produced by [[train]]. - * @param q An input query. - * @return A prediction. - */ - def predict(m: M, q: Q): P - - /** :: DeveloperApi :: - * Engine developers should not use this directly (read on to see how local - * algorithm models are persisted). - * - * Local algorithms produce local models. By default, models will be - * serialized and stored automatically. Engine developers can override this behavior by - * mixing the [[PersistentModel]] trait into the model class, and - * PredictionIO will call [[PersistentModel.save]] instead. If it returns - * true, a [[io.prediction.workflow.PersistentModelManifest]] will be - * returned so that during deployment, PredictionIO will use - * [[PersistentModelLoader]] to retrieve the model. Otherwise, Unit will be - * returned and the model will be re-trained on-the-fly. - * - * @param sc Spark context - * @param modelId Model ID - * @param algoParams Algorithm parameters that trained this model - * @param bm Model - * @return The model itself for automatic persistence, an instance of - * [[io.prediction.workflow.PersistentModelManifest]] for manual - * persistence, or Unit for re-training on deployment - */ - @DeveloperApi - override - def makePersistentModel( - sc: SparkContext, - modelId: String, - algoParams: Params, - bm: Any): Any = { - // Check RDD[M].count == 1 - val m = bm.asInstanceOf[RDD[M]].first() - if (m.isInstanceOf[PersistentModel[_]]) { - if (m.asInstanceOf[PersistentModel[Params]].save( - modelId, algoParams, sc)) { - PersistentModelManifest(className = m.getClass.getName) - } else { - Unit - } - } else { - m - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/LAverageServing.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/LAverageServing.scala b/core/src/main/scala/io/prediction/controller/LAverageServing.scala deleted file mode 100644 index 80981ab..0000000 --- a/core/src/main/scala/io/prediction/controller/LAverageServing.scala +++ /dev/null @@ -1,41 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import io.prediction.core.BaseAlgorithm - -/** A concrete implementation of [[LServing]] returning the average of all - * algorithms' predictions, where their classes are expected to be all Double. - * - * @group Serving - */ -class LAverageServing[Q] extends LServing[Q, Double] { - /** Returns the average of all algorithms' predictions. */ - def serve(query: Q, predictions: Seq[Double]): Double = { - predictions.sum / predictions.length - } -} - -/** A concrete implementation of [[LServing]] returning the average of all - * algorithms' predictions, where their classes are expected to be all Double. - * - * @group Serving - */ -object LAverageServing { - /** Returns an instance of [[LAverageServing]]. */ - def apply[Q](a: Class[_ <: BaseAlgorithm[_, _, Q, _]]): Class[LAverageServing[Q]] = - classOf[LAverageServing[Q]] -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/LDataSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/LDataSource.scala b/core/src/main/scala/io/prediction/controller/LDataSource.scala deleted file mode 100644 index aa53c8f..0000000 --- a/core/src/main/scala/io/prediction/controller/LDataSource.scala +++ /dev/null @@ -1,67 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import io.prediction.core.BaseDataSource -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD - -import scala.reflect._ - -/** Base class of a local data source. - * - * A local data source runs locally within a single machine and return data - * that can fit within a single machine. - * - * @tparam TD Training data class. - * @tparam EI Evaluation Info class. - * @tparam Q Input query class. - * @tparam A Actual value class. - * @group Data Source - */ -abstract class LDataSource[TD: ClassTag, EI, Q, A] - extends BaseDataSource[RDD[TD], EI, Q, A] { - - def readTrainingBase(sc: SparkContext): RDD[TD] = { - sc.parallelize(Seq(None)).map(_ => readTraining()) - } - - /** Implement this method to only return training data from a data source */ - def readTraining(): TD - - def readEvalBase(sc: SparkContext): Seq[(RDD[TD], EI, RDD[(Q, A)])] = { - val localEvalData: Seq[(TD, EI, Seq[(Q, A)])] = readEval() - - localEvalData.map { case (td, ei, qaSeq) => { - val tdRDD = sc.parallelize(Seq(None)).map(_ => td) - val qaRDD = sc.parallelize(qaSeq) - (tdRDD, ei, qaRDD) - }} - } - - /** To provide evaluation feature for your engine, your must override this - * method to return data for evaluation from a data source. Returned data can - * optionally include a sequence of query and actual value pairs for - * evaluation purpose. - * - * The default implementation returns an empty sequence as a stub, so that - * an engine can be compiled without implementing evaluation. - */ - def readEval(): Seq[(TD, EI, Seq[(Q, A)])] = Seq[(TD, EI, Seq[(Q, A)])]() - - @deprecated("Use readEval() instead.", "0.9.0") - def read(): Seq[(TD, EI, Seq[(Q, A)])] = readEval() -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/LFirstServing.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/LFirstServing.scala b/core/src/main/scala/io/prediction/controller/LFirstServing.scala deleted file mode 100644 index 970815e..0000000 --- a/core/src/main/scala/io/prediction/controller/LFirstServing.scala +++ /dev/null @@ -1,39 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import io.prediction.core.BaseAlgorithm - -/** A concrete implementation of [[LServing]] returning the first algorithm's - * prediction result directly without any modification. - * - * @group Serving - */ -class LFirstServing[Q, P] extends LServing[Q, P] { - /** Returns the first algorithm's prediction. */ - def serve(query: Q, predictions: Seq[P]): P = predictions.head -} - -/** A concrete implementation of [[LServing]] returning the first algorithm's - * prediction result directly without any modification. - * - * @group Serving - */ -object LFirstServing { - /** Returns an instance of [[LFirstServing]]. */ - def apply[Q, P](a: Class[_ <: BaseAlgorithm[_, _, Q, P]]): Class[LFirstServing[Q, P]] = - classOf[LFirstServing[Q, P]] -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/LPreparator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/LPreparator.scala b/core/src/main/scala/io/prediction/controller/LPreparator.scala deleted file mode 100644 index f66dfc0..0000000 --- a/core/src/main/scala/io/prediction/controller/LPreparator.scala +++ /dev/null @@ -1,46 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import io.prediction.core.BasePreparator -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD - -import scala.reflect._ - -/** Base class of a local preparator. - * - * A local preparator runs locally within a single machine and produces - * prepared data that can fit within a single machine. - * - * @tparam TD Training data class. - * @tparam PD Prepared data class. - * @group Preparator - */ -abstract class LPreparator[TD, PD : ClassTag] - extends BasePreparator[RDD[TD], RDD[PD]] { - - def prepareBase(sc: SparkContext, rddTd: RDD[TD]): RDD[PD] = { - rddTd.map(prepare) - } - - /** Implement this method to produce prepared data that is ready for model - * training. - * - * @param trainingData Training data to be prepared. - */ - def prepare(trainingData: TD): PD -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/LServing.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/LServing.scala b/core/src/main/scala/io/prediction/controller/LServing.scala deleted file mode 100644 index accee48..0000000 --- a/core/src/main/scala/io/prediction/controller/LServing.scala +++ /dev/null @@ -1,52 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import io.prediction.annotation.Experimental -import io.prediction.core.BaseServing - -/** Base class of serving. - * - * @tparam Q Input query class. - * @tparam P Output prediction class. - * @group Serving - */ -abstract class LServing[Q, P] extends BaseServing[Q, P] { - def supplementBase(q: Q): Q = supplement(q) - - /** :: Experimental :: - * Implement this method to supplement the query before sending it to - * algorithms. - * - * @param q Query - * @return A supplemented Query - */ - @Experimental - def supplement(q: Q): Q = q - - def serveBase(q: Q, ps: Seq[P]): P = { - serve(q, ps) - } - - /** Implement this method to combine multiple algorithms' predictions to - * produce a single final prediction. The query is the original query sent to - * the engine, not the supplemented produced by [[LServing.supplement]]. - * - * @param query Original input query. - * @param predictions A list of algorithms' predictions. - */ - def serve(query: Q, predictions: Seq[P]): P -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/LocalFileSystemPersistentModel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/LocalFileSystemPersistentModel.scala b/core/src/main/scala/io/prediction/controller/LocalFileSystemPersistentModel.scala deleted file mode 100644 index e9f0592..0000000 --- a/core/src/main/scala/io/prediction/controller/LocalFileSystemPersistentModel.scala +++ /dev/null @@ -1,74 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import org.apache.spark.SparkContext - -/** This trait is a convenience helper for persisting your model to the local - * filesystem. This trait and [[LocalFileSystemPersistentModelLoader]] contain - * concrete implementation and need not be implemented. - * - * The underlying implementation is [[Utils.save]]. - * - * {{{ - * class MyModel extends LocalFileSystemPersistentModel[MyParams] { - * ... - * } - * - * object MyModel extends LocalFileSystemPersistentModelLoader[MyParams, MyModel] { - * ... - * } - * }}} - * - * @tparam AP Algorithm parameters class. - * @see [[LocalFileSystemPersistentModelLoader]] - * @group Algorithm - */ -trait LocalFileSystemPersistentModel[AP <: Params] extends PersistentModel[AP] { - def save(id: String, params: AP, sc: SparkContext): Boolean = { - Utils.save(id, this) - true - } -} - -/** Implement an object that extends this trait for PredictionIO to support - * loading a persisted model from local filesystem during serving deployment. - * - * The underlying implementation is [[Utils.load]]. - * - * @tparam AP Algorithm parameters class. - * @tparam M Model class. - * @see [[LocalFileSystemPersistentModel]] - * @group Algorithm - */ -trait LocalFileSystemPersistentModelLoader[AP <: Params, M] - extends PersistentModelLoader[AP, M] { - def apply(id: String, params: AP, sc: Option[SparkContext]): M = { - Utils.load(id).asInstanceOf[M] - } -} - -/** DEPRECATED. Use [[LocalFileSystemPersistentModel]] instead. - * - * @group Algorithm */ -@deprecated("Use LocalFileSystemPersistentModel instead.", "0.9.2") -trait IFSPersistentModel[AP <: Params] extends LocalFileSystemPersistentModel[AP] - -/** DEPRECATED. Use [[LocalFileSystemPersistentModelLoader]] instead. - * - * @group Algorithm */ -@deprecated("Use LocalFileSystemPersistentModelLoader instead.", "0.9.2") -trait IFSPersistentModelLoader[AP <: Params, M] extends LocalFileSystemPersistentModelLoader[AP, M] http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/Metric.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/Metric.scala b/core/src/main/scala/io/prediction/controller/Metric.scala deleted file mode 100644 index 9e56125..0000000 --- a/core/src/main/scala/io/prediction/controller/Metric.scala +++ /dev/null @@ -1,266 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import _root_.io.prediction.controller.java.SerializableComparator -import io.prediction.core.BaseEngine -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD -import org.apache.spark.util.StatCounter - -import scala.Numeric.Implicits._ -import scala.reflect._ - -/** Base class of a [[Metric]]. - * - * @tparam EI Evaluation information - * @tparam Q Query - * @tparam P Predicted result - * @tparam A Actual result - * @tparam R Metric result - * @group Evaluation - */ -abstract class Metric[EI, Q, P, A, R](implicit rOrder: Ordering[R]) -extends Serializable { - /** Java friendly constructor - * - * @param comparator A serializable comparator for sorting the metric results. - * - */ - def this(comparator: SerializableComparator[R]) = { - this()(Ordering.comparatorToOrdering(comparator)) - } - - /** Class name of this [[Metric]]. */ - def header: String = this.getClass.getSimpleName - - /** Calculates the result of this [[Metric]]. */ - def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]): R - - /** Comparison function for R's ordering. */ - def compare(r0: R, r1: R): Int = rOrder.compare(r0, r1) -} - -private [prediction] trait StatsMetricHelper[EI, Q, P, A] { - def calculate(q: Q, p: P, a: A): Double - - def calculateStats(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]) - : StatCounter = { - val doubleRDD = sc.union( - evalDataSet.map { case (_, qpaRDD) => - qpaRDD.map { case (q, p, a) => calculate(q, p, a) } - } - ) - - doubleRDD.stats() - } -} - -private [prediction] trait StatsOptionMetricHelper[EI, Q, P, A] { - def calculate(q: Q, p: P, a: A): Option[Double] - - def calculateStats(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]) - : StatCounter = { - val doubleRDD = sc.union( - evalDataSet.map { case (_, qpaRDD) => - qpaRDD.flatMap { case (q, p, a) => calculate(q, p, a) } - } - ) - - doubleRDD.stats() - } -} - -/** Returns the global average of the score returned by the calculate method. - * - * @tparam EI Evaluation information - * @tparam Q Query - * @tparam P Predicted result - * @tparam A Actual result - * - * @group Evaluation - */ -abstract class AverageMetric[EI, Q, P, A] - extends Metric[EI, Q, P, A, Double] - with StatsMetricHelper[EI, Q, P, A] - with QPAMetric[Q, P, A, Double] { - /** Implement this method to return a score that will be used for averaging - * across all QPA tuples. - */ - def calculate(q: Q, p: P, a: A): Double - - def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]) - : Double = { - calculateStats(sc, evalDataSet).mean - } -} - -/** Returns the global average of the non-None score returned by the calculate - * method. - * - * @tparam EI Evaluation information - * @tparam Q Query - * @tparam P Predicted result - * @tparam A Actual result - * - * @group Evaluation - */ -abstract class OptionAverageMetric[EI, Q, P, A] - extends Metric[EI, Q, P, A, Double] - with StatsOptionMetricHelper[EI, Q, P, A] - with QPAMetric[Q, P, A, Option[Double]] { - /** Implement this method to return a score that will be used for averaging - * across all QPA tuples. - */ - def calculate(q: Q, p: P, a: A): Option[Double] - - def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]) - : Double = { - calculateStats(sc, evalDataSet).mean - } -} - -/** Returns the global standard deviation of the score returned by the calculate method - * - * This method uses org.apache.spark.util.StatCounter library, a one pass - * method is used for calculation - * - * @tparam EI Evaluation information - * @tparam Q Query - * @tparam P Predicted result - * @tparam A Actual result - * - * @group Evaluation - */ -abstract class StdevMetric[EI, Q, P, A] - extends Metric[EI, Q, P, A, Double] - with StatsMetricHelper[EI, Q, P, A] - with QPAMetric[Q, P, A, Double] { - /** Implement this method to return a score that will be used for calculating - * the stdev - * across all QPA tuples. - */ - def calculate(q: Q, p: P, a: A): Double - - def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]) - : Double = { - calculateStats(sc, evalDataSet).stdev - } -} - -/** Returns the global standard deviation of the non-None score returned by the calculate method - * - * This method uses org.apache.spark.util.StatCounter library, a one pass - * method is used for calculation - * - * @tparam EI Evaluation information - * @tparam Q Query - * @tparam P Predicted result - * @tparam A Actual result - * - * @group Evaluation - */ -abstract class OptionStdevMetric[EI, Q, P, A] - extends Metric[EI, Q, P, A, Double] - with StatsOptionMetricHelper[EI, Q, P, A] - with QPAMetric[Q, P, A, Option[Double]] { - /** Implement this method to return a score that will be used for calculating - * the stdev - * across all QPA tuples. - */ - def calculate(q: Q, p: P, a: A): Option[Double] - - def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]) - : Double = { - calculateStats(sc, evalDataSet).stdev - } -} - -/** Returns the sum of the score returned by the calculate method. - * - * @tparam EI Evaluation information - * @tparam Q Query - * @tparam P Predicted result - * @tparam A Actual result - * @tparam R Result, output of the function calculate, must be Numeric - * - * @group Evaluation - */ -abstract class SumMetric[EI, Q, P, A, R: ClassTag](implicit num: Numeric[R]) - extends Metric[EI, Q, P, A, R]()(num) - with QPAMetric[Q, P, A, R] { - /** Implement this method to return a score that will be used for summing - * across all QPA tuples. - */ - def calculate(q: Q, p: P, a: A): R - - def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]) - : R = { - val union: RDD[R] = sc.union( - evalDataSet.map { case (_, qpaRDD) => - qpaRDD.map { case (q, p, a) => calculate(q, p, a) } - } - ) - - union.aggregate[R](num.zero)(_ + _, _ + _) - } -} - -/** Returns zero. Useful as a placeholder during evaluation development when not all components are - * implemented. - * @tparam EI Evaluation information - * @tparam Q Query - * @tparam P Predicted result - * @tparam A Actual result - * - * @group Evaluation - */ -class ZeroMetric[EI, Q, P, A] extends Metric[EI, Q, P, A, Double]() { - def calculate(sc: SparkContext, evalDataSet: Seq[(EI, RDD[(Q, P, A)])]): Double = 0.0 -} - -/** Companion object of [[ZeroMetric]] - * - * @group Evaluation - */ -object ZeroMetric { - /** Returns a ZeroMetric instance using Engine's type parameters. */ - def apply[EI, Q, P, A](engine: BaseEngine[EI, Q, P, A]): ZeroMetric[EI, Q, P, A] = { - new ZeroMetric[EI, Q, P, A]() - } -} - - -/** Trait for metric which returns a score based on Query, PredictedResult, - * and ActualResult - * - * @tparam Q Query class - * @tparam P Predicted result class - * @tparam A Actual result class - * @tparam R Metric result class - * @group Evaluation - */ -trait QPAMetric[Q, P, A, R] { - /** Calculate a metric result based on query, predicted result, and actual - * result - * - * @param q Query - * @param p Predicted result - * @param a Actual result - * @return Metric result - */ - def calculate(q: Q, p: P, a: A): R -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/MetricEvaluator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/MetricEvaluator.scala b/core/src/main/scala/io/prediction/controller/MetricEvaluator.scala deleted file mode 100644 index 41ccc9c..0000000 --- a/core/src/main/scala/io/prediction/controller/MetricEvaluator.scala +++ /dev/null @@ -1,260 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import _root_.java.io.File -import _root_.java.io.PrintWriter - -import com.github.nscala_time.time.Imports.DateTime -import grizzled.slf4j.Logger -import io.prediction.annotation.DeveloperApi -import io.prediction.core.BaseEvaluator -import io.prediction.core.BaseEvaluatorResult -import io.prediction.data.storage.Storage -import io.prediction.workflow.JsonExtractor -import io.prediction.workflow.JsonExtractorOption.Both -import io.prediction.workflow.NameParamsSerializer -import io.prediction.workflow.WorkflowParams -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD -import org.json4s.native.Serialization.write -import org.json4s.native.Serialization.writePretty - -import scala.language.existentials - -/** Case class storing a primary score, and other scores - * - * @param score Primary metric score - * @param otherScores Other scores this metric might have - * @tparam R Type of the primary metric score - * @group Evaluation - */ -case class MetricScores[R]( - score: R, - otherScores: Seq[Any]) - -/** Contains all results of a [[MetricEvaluator]] - * - * @param bestScore The best score among all iterations - * @param bestEngineParams The set of engine parameters that yielded the best score - * @param bestIdx The index of iteration that yielded the best score - * @param metricHeader Brief description of the primary metric score - * @param otherMetricHeaders Brief descriptions of other metric scores - * @param engineParamsScores All sets of engine parameters and corresponding metric scores - * @param outputPath An optional output path where scores are saved - * @tparam R Type of the primary metric score - * @group Evaluation - */ -case class MetricEvaluatorResult[R]( - bestScore: MetricScores[R], - bestEngineParams: EngineParams, - bestIdx: Int, - metricHeader: String, - otherMetricHeaders: Seq[String], - engineParamsScores: Seq[(EngineParams, MetricScores[R])], - outputPath: Option[String]) -extends BaseEvaluatorResult { - - override def toOneLiner(): String = { - val idx = engineParamsScores.map(_._1).indexOf(bestEngineParams) - s"Best Params Index: $idx Score: ${bestScore.score}" - } - - override def toJSON(): String = { - implicit lazy val formats = Utils.json4sDefaultFormats + - new NameParamsSerializer - write(this) - } - - override def toHTML(): String = html.metric_evaluator().toString() - - override def toString: String = { - implicit lazy val formats = Utils.json4sDefaultFormats + - new NameParamsSerializer - - val bestEPStr = JsonExtractor.engineParamstoPrettyJson(Both, bestEngineParams) - - val strings = Seq( - "MetricEvaluatorResult:", - s" # engine params evaluated: ${engineParamsScores.size}") ++ - Seq( - "Optimal Engine Params:", - s" $bestEPStr", - "Metrics:", - s" $metricHeader: ${bestScore.score}") ++ - otherMetricHeaders.zip(bestScore.otherScores).map { - case (h, s) => s" $h: $s" - } ++ - outputPath.toSeq.map { - p => s"The best variant params can be found in $p" - } - - strings.mkString("\n") - } -} - -/** Companion object of [[MetricEvaluator]] - * - * @group Evaluation - */ -object MetricEvaluator { - def apply[EI, Q, P, A, R]( - metric: Metric[EI, Q, P, A, R], - otherMetrics: Seq[Metric[EI, Q, P, A, _]], - outputPath: String): MetricEvaluator[EI, Q, P, A, R] = { - new MetricEvaluator[EI, Q, P, A, R]( - metric, - otherMetrics, - Some(outputPath)) - } - - def apply[EI, Q, P, A, R]( - metric: Metric[EI, Q, P, A, R], - otherMetrics: Seq[Metric[EI, Q, P, A, _]]) - : MetricEvaluator[EI, Q, P, A, R] = { - new MetricEvaluator[EI, Q, P, A, R]( - metric, - otherMetrics, - None) - } - - def apply[EI, Q, P, A, R](metric: Metric[EI, Q, P, A, R]) - : MetricEvaluator[EI, Q, P, A, R] = { - new MetricEvaluator[EI, Q, P, A, R]( - metric, - Seq[Metric[EI, Q, P, A, _]](), - None) - } - - case class NameParams(name: String, params: Params) { - def this(np: (String, Params)) = this(np._1, np._2) - } - - case class EngineVariant( - id: String, - description: String, - engineFactory: String, - datasource: NameParams, - preparator: NameParams, - algorithms: Seq[NameParams], - serving: NameParams) { - - def this(evaluation: Evaluation, engineParams: EngineParams) = this( - id = "", - description = "", - engineFactory = evaluation.getClass.getName, - datasource = new NameParams(engineParams.dataSourceParams), - preparator = new NameParams(engineParams.preparatorParams), - algorithms = engineParams.algorithmParamsList.map(np => new NameParams(np)), - serving = new NameParams(engineParams.servingParams)) - } -} - -/** :: DeveloperApi :: - * Do no use this directly. Use [[MetricEvaluator$]] instead. This is an - * implementation of [[io.prediction.core.BaseEvaluator]] that evaluates - * prediction performance based on metric scores. - * - * @param metric Primary metric - * @param otherMetrics Other metrics - * @param outputPath Optional output path to save evaluation results - * @tparam EI Evaluation information type - * @tparam Q Query class - * @tparam P Predicted result class - * @tparam A Actual result class - * @tparam R Metric result class - * @group Evaluation - */ -@DeveloperApi -class MetricEvaluator[EI, Q, P, A, R] ( - val metric: Metric[EI, Q, P, A, R], - val otherMetrics: Seq[Metric[EI, Q, P, A, _]], - val outputPath: Option[String]) - extends BaseEvaluator[EI, Q, P, A, MetricEvaluatorResult[R]] { - @transient lazy val logger = Logger[this.type] - @transient val engineInstances = Storage.getMetaDataEngineInstances() - - def saveEngineJson( - evaluation: Evaluation, - engineParams: EngineParams, - outputPath: String) { - - val now = DateTime.now - val evalClassName = evaluation.getClass.getName - - val variant = MetricEvaluator.EngineVariant( - id = s"$evalClassName $now", - description = "", - engineFactory = evalClassName, - datasource = new MetricEvaluator.NameParams(engineParams.dataSourceParams), - preparator = new MetricEvaluator.NameParams(engineParams.preparatorParams), - algorithms = engineParams.algorithmParamsList.map(np => new MetricEvaluator.NameParams(np)), - serving = new MetricEvaluator.NameParams(engineParams.servingParams)) - - implicit lazy val formats = Utils.json4sDefaultFormats - - logger.info(s"Writing best variant params to disk ($outputPath)...") - val writer = new PrintWriter(new File(outputPath)) - writer.write(writePretty(variant)) - writer.close() - } - - def evaluateBase( - sc: SparkContext, - evaluation: Evaluation, - engineEvalDataSet: Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])], - params: WorkflowParams): MetricEvaluatorResult[R] = { - - val evalResultList: Seq[(EngineParams, MetricScores[R])] = engineEvalDataSet - .zipWithIndex - .par - .map { case ((engineParams, evalDataSet), idx) => - val metricScores = MetricScores[R]( - metric.calculate(sc, evalDataSet), - otherMetrics.map(_.calculate(sc, evalDataSet))) - (engineParams, metricScores) - } - .seq - - implicit lazy val formats = Utils.json4sDefaultFormats + - new NameParamsSerializer - - evalResultList.zipWithIndex.foreach { case ((ep, r), idx) => - logger.info(s"Iteration $idx") - logger.info(s"EngineParams: ${JsonExtractor.engineParamsToJson(Both, ep)}") - logger.info(s"Result: $r") - } - - // use max. take implicit from Metric. - val ((bestEngineParams, bestScore), bestIdx) = evalResultList - .zipWithIndex - .reduce { (x, y) => - if (metric.compare(x._1._2.score, y._1._2.score) >= 0) x else y - } - - // save engine params if it is set. - outputPath.foreach { path => saveEngineJson(evaluation, bestEngineParams, path) } - - MetricEvaluatorResult( - bestScore = bestScore, - bestEngineParams = bestEngineParams, - bestIdx = bestIdx, - metricHeader = metric.header, - otherMetricHeaders = otherMetrics.map(_.header), - engineParamsScores = evalResultList, - outputPath = outputPath) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/P2LAlgorithm.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/P2LAlgorithm.scala b/core/src/main/scala/io/prediction/controller/P2LAlgorithm.scala deleted file mode 100644 index c59b9af..0000000 --- a/core/src/main/scala/io/prediction/controller/P2LAlgorithm.scala +++ /dev/null @@ -1,121 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import _root_.io.prediction.annotation.DeveloperApi -import io.prediction.core.BaseAlgorithm -import io.prediction.workflow.PersistentModelManifest -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -import scala.reflect._ - -/** Base class of a parallel-to-local algorithm. - * - * A parallel-to-local algorithm can be run in parallel on a cluster and - * produces a model that can fit within a single machine. - * - * If your input query class requires custom JSON4S serialization, the most - * idiomatic way is to implement a trait that extends [[CustomQuerySerializer]], - * and mix that into your algorithm class, instead of overriding - * [[querySerializer]] directly. - * - * @tparam PD Prepared data class. - * @tparam M Trained model class. - * @tparam Q Input query class. - * @tparam P Output prediction class. - * @group Algorithm - */ -abstract class P2LAlgorithm[PD, M: ClassTag, Q: ClassTag, P] - extends BaseAlgorithm[PD, M, Q, P] { - - def trainBase(sc: SparkContext, pd: PD): M = train(sc, pd) - - /** Implement this method to produce a model from prepared data. - * - * @param pd Prepared data for model training. - * @return Trained model. - */ - def train(sc: SparkContext, pd: PD): M - - def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)]) - : RDD[(Long, P)] = batchPredict(bm.asInstanceOf[M], qs) - - /** This is a default implementation to perform batch prediction. Override - * this method for a custom implementation. - * - * @param m A model - * @param qs An RDD of index-query tuples. The index is used to keep track of - * predicted results with corresponding queries. - * @return Batch of predicted results - */ - def batchPredict(m: M, qs: RDD[(Long, Q)]): RDD[(Long, P)] = { - qs.mapValues { q => predict(m, q) } - } - - def predictBase(bm: Any, q: Q): P = predict(bm.asInstanceOf[M], q) - - /** Implement this method to produce a prediction from a query and trained - * model. - * - * @param model Trained model produced by [[train]]. - * @param query An input query. - * @return A prediction. - */ - def predict(model: M, query: Q): P - - /** :: DeveloperApi :: - * Engine developers should not use this directly (read on to see how - * parallel-to-local algorithm models are persisted). - * - * Parallel-to-local algorithms produce local models. By default, models will be - * serialized and stored automatically. Engine developers can override this behavior by - * mixing the [[PersistentModel]] trait into the model class, and - * PredictionIO will call [[PersistentModel.save]] instead. If it returns - * true, a [[io.prediction.workflow.PersistentModelManifest]] will be - * returned so that during deployment, PredictionIO will use - * [[PersistentModelLoader]] to retrieve the model. Otherwise, Unit will be - * returned and the model will be re-trained on-the-fly. - * - * @param sc Spark context - * @param modelId Model ID - * @param algoParams Algorithm parameters that trained this model - * @param bm Model - * @return The model itself for automatic persistence, an instance of - * [[io.prediction.workflow.PersistentModelManifest]] for manual - * persistence, or Unit for re-training on deployment - */ - @DeveloperApi - override - def makePersistentModel( - sc: SparkContext, - modelId: String, - algoParams: Params, - bm: Any): Any = { - val m = bm.asInstanceOf[M] - if (m.isInstanceOf[PersistentModel[_]]) { - if (m.asInstanceOf[PersistentModel[Params]].save( - modelId, algoParams, sc)) { - PersistentModelManifest(className = m.getClass.getName) - } else { - Unit - } - } else { - m - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/PAlgorithm.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/PAlgorithm.scala b/core/src/main/scala/io/prediction/controller/PAlgorithm.scala deleted file mode 100644 index e9916be..0000000 --- a/core/src/main/scala/io/prediction/controller/PAlgorithm.scala +++ /dev/null @@ -1,126 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import io.prediction.annotation.DeveloperApi -import io.prediction.core.BaseAlgorithm -import io.prediction.workflow.PersistentModelManifest -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD - -/** Base class of a parallel algorithm. - * - * A parallel algorithm can be run in parallel on a cluster and produces a - * model that can also be distributed across a cluster. - * - * If your input query class requires custom JSON4S serialization, the most - * idiomatic way is to implement a trait that extends [[CustomQuerySerializer]], - * and mix that into your algorithm class, instead of overriding - * [[querySerializer]] directly. - * - * To provide evaluation feature, one must override and implement the - * [[batchPredict]] method. Otherwise, an exception will be thrown when pio eval` - * is used. - * - * @tparam PD Prepared data class. - * @tparam M Trained model class. - * @tparam Q Input query class. - * @tparam P Output prediction class. - * @group Algorithm - */ -abstract class PAlgorithm[PD, M, Q, P] - extends BaseAlgorithm[PD, M, Q, P] { - - def trainBase(sc: SparkContext, pd: PD): M = train(sc, pd) - - /** Implement this method to produce a model from prepared data. - * - * @param pd Prepared data for model training. - * @return Trained model. - */ - def train(sc: SparkContext, pd: PD): M - - def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)]) - : RDD[(Long, P)] = batchPredict(bm.asInstanceOf[M], qs) - - /** To provide evaluation feature, one must override and implement this method - * to generate many predictions in batch. Otherwise, an exception will be - * thrown when `pio eval` is used. - * - * The default implementation throws an exception. - * - * @param m Trained model produced by [[train]]. - * @param qs An RDD of index-query tuples. The index is used to keep track of - * predicted results with corresponding queries. - */ - def batchPredict(m: M, qs: RDD[(Long, Q)]): RDD[(Long, P)] = - throw new NotImplementedError("batchPredict not implemented") - - def predictBase(baseModel: Any, query: Q): P = { - predict(baseModel.asInstanceOf[M], query) - } - - /** Implement this method to produce a prediction from a query and trained - * model. - * - * @param model Trained model produced by [[train]]. - * @param query An input query. - * @return A prediction. - */ - def predict(model: M, query: Q): P - - /** :: DeveloperApi :: - * Engine developers should not use this directly (read on to see how parallel - * algorithm models are persisted). - * - * In general, parallel models may contain multiple RDDs. It is not easy to - * infer and persist them programmatically since these RDDs may be - * potentially huge. To persist these models, engine developers need to mix - * the [[PersistentModel]] trait into the model class and implement - * [[PersistentModel.save]]. If it returns true, a - * [[io.prediction.workflow.PersistentModelManifest]] will be - * returned so that during deployment, PredictionIO will use - * [[PersistentModelLoader]] to retrieve the model. Otherwise, Unit will be - * returned and the model will be re-trained on-the-fly. - * - * @param sc Spark context - * @param modelId Model ID - * @param algoParams Algorithm parameters that trained this model - * @param bm Model - * @return The model itself for automatic persistence, an instance of - * [[io.prediction.workflow.PersistentModelManifest]] for manual - * persistence, or Unit for re-training on deployment - */ - @DeveloperApi - override - def makePersistentModel( - sc: SparkContext, - modelId: String, - algoParams: Params, - bm: Any): Any = { - val m = bm.asInstanceOf[M] - if (m.isInstanceOf[PersistentModel[_]]) { - if (m.asInstanceOf[PersistentModel[Params]].save( - modelId, algoParams, sc)) { - PersistentModelManifest(className = m.getClass.getName) - } else { - Unit - } - } else { - Unit - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/PDataSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/PDataSource.scala b/core/src/main/scala/io/prediction/controller/PDataSource.scala deleted file mode 100644 index 55a2cf9..0000000 --- a/core/src/main/scala/io/prediction/controller/PDataSource.scala +++ /dev/null @@ -1,57 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import io.prediction.core.BaseDataSource -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD - -/** Base class of a parallel data source. - * - * A parallel data source runs locally within a single machine, or in parallel - * on a cluster, to return data that is distributed across a cluster. - * - * @tparam TD Training data class. - * @tparam EI Evaluation Info class. - * @tparam Q Input query class. - * @tparam A Actual value class. - * @group Data Source - */ - -abstract class PDataSource[TD, EI, Q, A] - extends BaseDataSource[TD, EI, Q, A] { - - def readTrainingBase(sc: SparkContext): TD = readTraining(sc) - - /** Implement this method to only return training data from a data source */ - def readTraining(sc: SparkContext): TD - - def readEvalBase(sc: SparkContext): Seq[(TD, EI, RDD[(Q, A)])] = readEval(sc) - - /** To provide evaluation feature for your engine, your must override this - * method to return data for evaluation from a data source. Returned data can - * optionally include a sequence of query and actual value pairs for - * evaluation purpose. - * - * The default implementation returns an empty sequence as a stub, so that - * an engine can be compiled without implementing evaluation. - */ - def readEval(sc: SparkContext): Seq[(TD, EI, RDD[(Q, A)])] = - Seq[(TD, EI, RDD[(Q, A)])]() - - @deprecated("Use readEval() instead.", "0.9.0") - def read(sc: SparkContext): Seq[(TD, EI, RDD[(Q, A)])] = readEval(sc) -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/PPreparator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/PPreparator.scala b/core/src/main/scala/io/prediction/controller/PPreparator.scala deleted file mode 100644 index 154560b..0000000 --- a/core/src/main/scala/io/prediction/controller/PPreparator.scala +++ /dev/null @@ -1,44 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import io.prediction.core.BasePreparator -import org.apache.spark.SparkContext - -/** Base class of a parallel preparator. - * - * A parallel preparator can be run in parallel on a cluster and produces a - * prepared data that is distributed across a cluster. - * - * @tparam TD Training data class. - * @tparam PD Prepared data class. - * @group Preparator - */ -abstract class PPreparator[TD, PD] - extends BasePreparator[TD, PD] { - - def prepareBase(sc: SparkContext, td: TD): PD = { - prepare(sc, td) - } - - /** Implement this method to produce prepared data that is ready for model - * training. - * - * @param sc An Apache Spark context. - * @param trainingData Training data to be prepared. - */ - def prepare(sc: SparkContext, trainingData: TD): PD -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/Params.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/Params.scala b/core/src/main/scala/io/prediction/controller/Params.scala deleted file mode 100644 index 0d5d149..0000000 --- a/core/src/main/scala/io/prediction/controller/Params.scala +++ /dev/null @@ -1,31 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -/** Base trait for all kinds of parameters that will be passed to constructors - * of different controller classes. - * - * @group Helper - */ -trait Params extends Serializable {} - -/** A concrete implementation of [[Params]] representing empty parameters. - * - * @group Helper - */ -case class EmptyParams() extends Params { - override def toString(): String = "Empty" -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/PersistentModel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/PersistentModel.scala b/core/src/main/scala/io/prediction/controller/PersistentModel.scala deleted file mode 100644 index 5d0ec41..0000000 --- a/core/src/main/scala/io/prediction/controller/PersistentModel.scala +++ /dev/null @@ -1,112 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import org.apache.spark.SparkContext - -/** Mix in and implement this trait if your model cannot be persisted by - * PredictionIO automatically. A companion object extending - * IPersistentModelLoader is required for PredictionIO to load the persisted - * model automatically during deployment. - * - * Notice that models generated by [[PAlgorithm]] cannot be persisted - * automatically by nature and must implement these traits if model persistence - * is desired. - * - * {{{ - * class MyModel extends PersistentModel[MyParams] { - * def save(id: String, params: MyParams, sc: SparkContext): Boolean = { - * ... - * } - * } - * - * object MyModel extends PersistentModelLoader[MyParams, MyModel] { - * def apply(id: String, params: MyParams, sc: Option[SparkContext]): MyModel = { - * ... - * } - * } - * }}} - * - * In Java, all you need to do is to implement this interface, and add a static - * method with 3 arguments of type String, [[Params]], and SparkContext. - * - * {{{ - * public class MyModel implements PersistentModel<MyParams>, Serializable { - * ... - * public boolean save(String id, MyParams params, SparkContext sc) { - * ... - * } - * - * public static MyModel load(String id, Params params, SparkContext sc) { - * ... - * } - * ... - * } - * }}} - * - * @tparam AP Algorithm parameters class. - * @see [[PersistentModelLoader]] - * @group Algorithm - */ -trait PersistentModel[AP <: Params] { - /** Save the model to some persistent storage. - * - * This method should return true if the model has been saved successfully so - * that PredictionIO knows that it can be restored later during deployment. - * This method should return false if the model cannot be saved (or should - * not be saved due to configuration) so that PredictionIO will re-train the - * model during deployment. All arguments of this method are provided by - * automatically by PredictionIO. - * - * @param id ID of the run that trained this model. - * @param params Algorithm parameters that were used to train this model. - * @param sc An Apache Spark context. - */ - def save(id: String, params: AP, sc: SparkContext): Boolean -} - -/** Implement an object that extends this trait for PredictionIO to support - * loading a persisted model during serving deployment. - * - * @tparam AP Algorithm parameters class. - * @tparam M Model class. - * @see [[PersistentModel]] - * @group Algorithm - */ -trait PersistentModelLoader[AP <: Params, M] { - /** Implement this method to restore a persisted model that extends the - * [[PersistentModel]] trait. All arguments of this method are provided - * automatically by PredictionIO. - * - * @param id ID of the run that trained this model. - * @param params Algorithm parameters that were used to train this model. - * @param sc An optional Apache Spark context. This will be injected if the - * model was generated by a [[PAlgorithm]]. - */ - def apply(id: String, params: AP, sc: Option[SparkContext]): M -} - -/** DEPRECATED. Use [[PersistentModel]] instead. - * - * @group Algorithm */ -@deprecated("Use PersistentModel instead.", "0.9.2") -trait IPersistentModel[AP <: Params] extends PersistentModel[AP] - -/** DEPRECATED. Use [[PersistentModelLoader]] instead. - * - * @group Algorithm */ -@deprecated("Use PersistentModelLoader instead.", "0.9.2") -trait IPersistentModelLoader[AP <: Params, M] extends PersistentModelLoader[AP, M] http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/SanityCheck.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/SanityCheck.scala b/core/src/main/scala/io/prediction/controller/SanityCheck.scala deleted file mode 100644 index bb5342f..0000000 --- a/core/src/main/scala/io/prediction/controller/SanityCheck.scala +++ /dev/null @@ -1,30 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package io.prediction.controller - -/** Extends a data class with this trait if you want PredictionIO to - * automatically perform sanity check on your data classes during training. - * This is very useful when you need to debug your engine. - * - * @group Helper - */ -trait SanityCheck { - /** Implement this method to perform checks on your data. This method should - * contain assertions that throw exceptions when your data does not meet - * your pre-defined requirement. - */ - def sanityCheck(): Unit -}
