http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala b/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala new file mode 100644 index 0000000..e26b754 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala @@ -0,0 +1,419 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.workflow + +import java.io.File +import java.io.FileNotFoundException + +import org.apache.predictionio.controller.EmptyParams +import org.apache.predictionio.controller.EngineFactory +import org.apache.predictionio.controller.EngineParamsGenerator +import org.apache.predictionio.controller.Evaluation +import org.apache.predictionio.controller.Params +import org.apache.predictionio.controller.PersistentModelLoader +import org.apache.predictionio.controller.Utils +import org.apache.predictionio.core.BuildInfo + +import com.google.gson.Gson +import com.google.gson.JsonSyntaxException +import grizzled.slf4j.Logging +import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption +import org.apache.log4j.Level +import org.apache.log4j.LogManager +import org.apache.spark.SparkContext +import org.apache.spark.api.java.JavaRDDLike +import org.apache.spark.rdd.RDD +import org.json4s.JsonAST.JValue +import org.json4s.MappingException +import org.json4s._ +import org.json4s.native.JsonMethods._ + +import scala.io.Source +import scala.language.existentials +import scala.reflect.runtime.universe + +/** Collection of reusable workflow related utilities. */ +object WorkflowUtils extends Logging { + @transient private lazy val gson = new Gson + + /** Obtains an Engine object in Scala, or instantiate an Engine in Java. + * + * @param engine Engine factory name. + * @param cl A Java ClassLoader to look for engine-related classes. + * + * @throws ClassNotFoundException + * Thrown when engine factory class does not exist. + * @throws NoSuchMethodException + * Thrown when engine factory's apply() method is not implemented. + */ + def getEngine(engine: String, cl: ClassLoader): (EngineLanguage.Value, EngineFactory) = { + val runtimeMirror = universe.runtimeMirror(cl) + val engineModule = runtimeMirror.staticModule(engine) + val engineObject = runtimeMirror.reflectModule(engineModule) + try { + ( + EngineLanguage.Scala, + engineObject.instance.asInstanceOf[EngineFactory] + ) + } catch { + case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try { + ( + EngineLanguage.Java, + Class.forName(engine).newInstance.asInstanceOf[EngineFactory] + ) + } + } + } + + def getEngineParamsGenerator(epg: String, cl: ClassLoader): + (EngineLanguage.Value, EngineParamsGenerator) = { + val runtimeMirror = universe.runtimeMirror(cl) + val epgModule = runtimeMirror.staticModule(epg) + val epgObject = runtimeMirror.reflectModule(epgModule) + try { + ( + EngineLanguage.Scala, + epgObject.instance.asInstanceOf[EngineParamsGenerator] + ) + } catch { + case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try { + ( + EngineLanguage.Java, + Class.forName(epg).newInstance.asInstanceOf[EngineParamsGenerator] + ) + } + } + } + + def getEvaluation(evaluation: String, cl: ClassLoader): (EngineLanguage.Value, Evaluation) = { + val runtimeMirror = universe.runtimeMirror(cl) + val evaluationModule = runtimeMirror.staticModule(evaluation) + val evaluationObject = runtimeMirror.reflectModule(evaluationModule) + try { + ( + EngineLanguage.Scala, + evaluationObject.instance.asInstanceOf[Evaluation] + ) + } catch { + case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try { + ( + EngineLanguage.Java, + Class.forName(evaluation).newInstance.asInstanceOf[Evaluation] + ) + } + } + } + + /** Converts a JSON document to an instance of Params. + * + * @param language Engine's programming language. + * @param json JSON document. + * @param clazz Class of the component that is going to receive the resulting + * Params instance as a constructor argument. + * @param jsonExtractor JSON extractor option. + * @param formats JSON4S serializers for deserialization. + * + * @throws MappingException Thrown when JSON4S fails to perform conversion. + * @throws JsonSyntaxException Thrown when GSON fails to perform conversion. + */ + def extractParams( + language: EngineLanguage.Value = EngineLanguage.Scala, + json: String, + clazz: Class[_], + jsonExtractor: JsonExtractorOption, + formats: Formats = Utils.json4sDefaultFormats): Params = { + implicit val f = formats + val pClass = clazz.getConstructors.head.getParameterTypes + if (pClass.size == 0) { + if (json != "") { + warn(s"Non-empty parameters supplied to ${clazz.getName}, but its " + + "constructor does not accept any arguments. Stubbing with empty " + + "parameters.") + } + EmptyParams() + } else { + val apClass = pClass.head + try { + JsonExtractor.extract(jsonExtractor, json, apClass, f).asInstanceOf[Params] + } catch { + case e@(_: MappingException | _: JsonSyntaxException) => + error( + s"Unable to extract parameters for ${apClass.getName} from " + + s"JSON string: $json. Aborting workflow.", + e) + throw e + } + } + } + + def getParamsFromJsonByFieldAndClass( + variantJson: JValue, + field: String, + classMap: Map[String, Class[_]], + engineLanguage: EngineLanguage.Value, + jsonExtractor: JsonExtractorOption): (String, Params) = { + variantJson findField { + case JField(f, _) => f == field + case _ => false + } map { jv => + implicit lazy val formats = Utils.json4sDefaultFormats + new NameParamsSerializer + val np: NameParams = try { + jv._2.extract[NameParams] + } catch { + case e: Exception => + error(s"Unable to extract $field name and params $jv") + throw e + } + val extractedParams = np.params.map { p => + try { + if (!classMap.contains(np.name)) { + error(s"Unable to find $field class with name '${np.name}'" + + " defined in Engine.") + sys.exit(1) + } + WorkflowUtils.extractParams( + engineLanguage, + compact(render(p)), + classMap(np.name), + jsonExtractor, + formats) + } catch { + case e: Exception => + error(s"Unable to extract $field params $p") + throw e + } + }.getOrElse(EmptyParams()) + + (np.name, extractedParams) + } getOrElse("", EmptyParams()) + } + + /** Grab environmental variables that starts with 'PIO_'. */ + def pioEnvVars: Map[String, String] = + sys.env.filter(kv => kv._1.startsWith("PIO_")) + + /** Converts Java (non-Scala) objects to a JSON4S JValue. + * + * @param params The Java object to be converted. + */ + def javaObjectToJValue(params: AnyRef): JValue = parse(gson.toJson(params)) + + private[prediction] def checkUpgrade( + component: String = "core", + engine: String = ""): Unit = { + val runner = new Thread(new UpgradeCheckRunner(component, engine)) + runner.start() + } + + // Extract debug string by recursively traversing the data. + def debugString[D](data: D): String = { + val s: String = data match { + case rdd: RDD[_] => { + debugString(rdd.collect()) + } + case javaRdd: JavaRDDLike[_, _] => { + debugString(javaRdd.collect()) + } + case array: Array[_] => { + "[" + array.map(debugString).mkString(",") + "]" + } + case d: AnyRef => { + d.toString + } + case null => "null" + } + s + } + + /** Detect third party software configuration files to be submitted as + * extras to Apache Spark. This makes sure all executors receive the same + * configuration. + */ + def thirdPartyConfFiles: Seq[String] = { + val thirdPartyFiles = Map( + "PIO_CONF_DIR" -> "log4j.properties", + "ES_CONF_DIR" -> "elasticsearch.yml", + "HADOOP_CONF_DIR" -> "core-site.xml", + "HBASE_CONF_DIR" -> "hbase-site.xml") + + thirdPartyFiles.keys.toSeq.map { k: String => + sys.env.get(k) map { x => + val p = Seq(x, thirdPartyFiles(k)).mkString(File.separator) + if (new File(p).exists) Seq(p) else Seq[String]() + } getOrElse Seq[String]() + }.flatten + } + + def thirdPartyClasspaths: Seq[String] = { + val thirdPartyPaths = Seq( + "PIO_CONF_DIR", + "ES_CONF_DIR", + "POSTGRES_JDBC_DRIVER", + "MYSQL_JDBC_DRIVER", + "HADOOP_CONF_DIR", + "HBASE_CONF_DIR") + thirdPartyPaths.map(p => + sys.env.get(p).map(Seq(_)).getOrElse(Seq[String]()) + ).flatten + } + + def modifyLogging(verbose: Boolean): Unit = { + val rootLoggerLevel = if (verbose) Level.TRACE else Level.INFO + val chattyLoggerLevel = if (verbose) Level.INFO else Level.WARN + + LogManager.getRootLogger.setLevel(rootLoggerLevel) + + LogManager.getLogger("org.elasticsearch").setLevel(chattyLoggerLevel) + LogManager.getLogger("org.apache.hadoop").setLevel(chattyLoggerLevel) + LogManager.getLogger("org.apache.spark").setLevel(chattyLoggerLevel) + LogManager.getLogger("org.eclipse.jetty").setLevel(chattyLoggerLevel) + LogManager.getLogger("akka").setLevel(chattyLoggerLevel) + } + + def extractNameParams(jv: JValue): NameParams = { + implicit val formats = Utils.json4sDefaultFormats + val nameOpt = (jv \ "name").extract[Option[String]] + val paramsOpt = (jv \ "params").extract[Option[JValue]] + + if (nameOpt.isEmpty && paramsOpt.isEmpty) { + error("Unable to find 'name' or 'params' fields in" + + s" ${compact(render(jv))}.\n" + + "Since 0.8.4, the 'params' field is required in engine.json" + + " in order to specify parameters for DataSource, Preparator or" + + " Serving.\n" + + "Please go to https://docs.prediction.io/resources/upgrade/" + + " for detailed instruction of how to change engine.json.") + sys.exit(1) + } + + if (nameOpt.isEmpty) { + info(s"No 'name' is found. Default empty String will be used.") + } + + if (paramsOpt.isEmpty) { + info(s"No 'params' is found. Default EmptyParams will be used.") + } + + NameParams( + name = nameOpt.getOrElse(""), + params = paramsOpt + ) + } + + def extractSparkConf(root: JValue): List[(String, String)] = { + def flatten(jv: JValue): List[(List[String], String)] = { + jv match { + case JObject(fields) => + for ((namePrefix, childJV) <- fields; + (name, value) <- flatten(childJV)) + yield (namePrefix :: name) -> value + case JArray(_) => { + error("Arrays are not allowed in the sparkConf section of engine.js.") + sys.exit(1) + } + case JNothing => List() + case _ => List(List() -> jv.values.toString) + } + } + + flatten(root \ "sparkConf").map(x => + (x._1.reduce((a, b) => s"$a.$b"), x._2)) + } +} + +case class NameParams(name: String, params: Option[JValue]) + +class NameParamsSerializer extends CustomSerializer[NameParams](format => ( { + case jv: JValue => WorkflowUtils.extractNameParams(jv) +}, { + case x: NameParams => + JObject(JField("name", JString(x.name)) :: + JField("params", x.params.getOrElse(JNothing)) :: Nil) +} + )) + +/** Collection of reusable workflow related utilities that touch on Apache + * Spark. They are separated to avoid compilation problems with certain code. + */ +object SparkWorkflowUtils extends Logging { + def getPersistentModel[AP <: Params, M]( + pmm: PersistentModelManifest, + runId: String, + params: AP, + sc: Option[SparkContext], + cl: ClassLoader): M = { + val runtimeMirror = universe.runtimeMirror(cl) + val pmmModule = runtimeMirror.staticModule(pmm.className) + val pmmObject = runtimeMirror.reflectModule(pmmModule) + try { + pmmObject.instance.asInstanceOf[PersistentModelLoader[AP, M]]( + runId, + params, + sc) + } catch { + case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try { + val loadMethod = Class.forName(pmm.className).getMethod( + "load", + classOf[String], + classOf[Params], + classOf[SparkContext]) + loadMethod.invoke(null, runId, params, sc.orNull).asInstanceOf[M] + } catch { + case e: ClassNotFoundException => + error(s"Model class ${pmm.className} cannot be found.") + throw e + case e: NoSuchMethodException => + error( + "The load(String, Params, SparkContext) method cannot be found.") + throw e + } + } + } +} + +class UpgradeCheckRunner( + val component: String, + val engine: String) extends Runnable with Logging { + val version = BuildInfo.version + val versionsHost = "https://direct.prediction.io/" + + def run(): Unit = { + val url = if (engine == "") { + s"$versionsHost$version/$component.json" + } else { + s"$versionsHost$version/$component/$engine.json" + } + try { + val upgradeData = Source.fromURL(url) + } catch { + case e: FileNotFoundException => + debug(s"Update metainfo not found. $url") + case e: java.net.UnknownHostException => + debug(s"${e.getClass.getName}: {e.getMessage}") + } + // TODO: Implement upgrade logic + } +} + +class WorkflowInterruption() extends Exception + +case class StopAfterReadInterruption() extends WorkflowInterruption + +case class StopAfterPrepareInterruption() extends WorkflowInterruption + +object EngineLanguage extends Enumeration { + val Scala, Java = Value +}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/twirl/io/prediction/controller/metric_evaluator.scala.html ---------------------------------------------------------------------- diff --git a/core/src/main/twirl/io/prediction/controller/metric_evaluator.scala.html b/core/src/main/twirl/io/prediction/controller/metric_evaluator.scala.html deleted file mode 100644 index 2e679a5..0000000 --- a/core/src/main/twirl/io/prediction/controller/metric_evaluator.scala.html +++ /dev/null @@ -1,95 +0,0 @@ -<html> - <head> - <script type='text/javascript' src='https://www.google.com/jsapi'></script> - <script src="http://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script> - <script> - google.load('visualization', '1', {packages:['table', 'corechart',]}); - </script> - </head> - <body> - <h1>Metric Evaluator</h1> - <div id='debug'></div> - <div id='table'> - <h3>Engine Params Evaluation Results</h3> - <div>Click on table to view the engine params</div> - </div> - <pre id='engineParams'></div> - <script type='text/javascript'> - google.setOnLoadCallback(load); - - //var url = 'http://localhost:9000/engine_instances/ky01Q-glQheNE_s885JTSg/local_evaluator_results.json'; - var url = 'evaluator_results.json'; - var rawData; - var metricHeader; - var otherMetricHeaders; - var engineParamsScores; - var table; - var dataTable; - - function load() { - rawData = JSON.parse( - jQuery.ajax({ - url: url, - dataType: 'json', - async: false, - }).responseText); - - metricHeader = rawData['metricHeader']; - otherMetricHeaders = rawData['otherMetricHeaders']; - engineParamsScores = rawData['engineParamsScores']; - - drawTable(); - } - - function tableSelectHandler() { - var selection = table.getSelection(); - if (selection.length > 0) { - var row = selection[0].row; - var idx = dataTable.getValue(row, 0); - var engineParams = engineParamsScores[idx]._1; - - document.getElementById('engineParams').innerHTML = JSON.stringify( - engineParams, null, 2); - } else { - document.getElementById('engineParams').innerHTML = ""; - } - } - - function drawTable() { - var tableDiv = document.createElement('div'); - document.getElementById('table').appendChild(tableDiv); - - - var dataArray = []; - - var headers = ['Index', 'Best', metricHeader].concat(otherMetricHeaders); - dataArray.push(headers); - - for (epIdx = 0; epIdx < engineParamsScores.length; epIdx++) { - var epScore = engineParamsScores[epIdx]; - var isBest = (epIdx == rawData.bestIdx ? "*" : ""); - dataArray.push([epIdx, isBest, epScore._2.score].concat(epScore._2.otherScores)); - } - - dataTable = google.visualization.arrayToDataTable(dataArray, false); - - // formatter - var numberFormatter = new google.visualization.NumberFormat({fractionDigits: 4}); - - for (colIdx = 1; colIdx < dataTable.getNumberOfColumns(); colIdx++) { - if (dataTable.getColumnType(colIdx) == "number") { - numberFormatter.format(dataTable, colIdx); - } - } - - table = new google.visualization.Table(tableDiv); - - // select handler - google.visualization.events.addListener(table, 'select', tableSelectHandler); - - table.draw(dataTable); - } - - </script> - </body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/twirl/io/prediction/workflow/index.scala.html ---------------------------------------------------------------------- diff --git a/core/src/main/twirl/io/prediction/workflow/index.scala.html b/core/src/main/twirl/io/prediction/workflow/index.scala.html deleted file mode 100644 index 4e0707b..0000000 --- a/core/src/main/twirl/io/prediction/workflow/index.scala.html +++ /dev/null @@ -1,92 +0,0 @@ -@import io.prediction.data.storage.EngineInstance -@import io.prediction.data.storage.EngineManifest -@import io.prediction.workflow.ServerConfig -@import org.joda.time.DateTime -@import org.joda.time.format.DateTimeFormat -@(args: ServerConfig, - manifest: EngineManifest, - engineInstance: EngineInstance, - algorithms: Seq[String], - algorithmsParams: Seq[String], - models: Seq[String], - dataSourceParams: String, - preparatorParams: String, - servingParams: String, - serverStartTime: DateTime, - feedback: Boolean, - eventServerIp: String, - eventServerPort: Int, - requestCount: Int, - avgServingSec: Double, - lastServingSec: Double - ) -<!DOCTYPE html> -<html lang="en"> - <head> - <title>@{engineInstance.engineFactory} (@{engineInstance.engineVariant}) - PredictionIO Engine Server at @{args.ip}:@{args.port}</title> - <link href="/assets/bootstrap-3.2.0-dist/css/bootstrap.min.css" rel="stylesheet"> - <style type="text/css"> - td { font-family: Menlo, Monaco, Consolas, "Courier New", monospace; } - </style> - </head> - <body> - <div class="container"> - <div class="page-header"> - <h1>PredictionIO Engine Server at @{args.ip}:@{args.port}</h1> - <p class="lead">@{engineInstance.engineFactory} (@{engineInstance.engineVariant})</p> - </div> - <h2>Engine Information</h2> - <table class="table table-bordered table-striped"> - <tr><th>Training Start Time</th><td>@{DateTimeFormat.forStyle("FF").print(engineInstance.startTime)}</td></tr> - <tr><th>Training End Time</th><td>@{DateTimeFormat.forStyle("FF").print(engineInstance.endTime)}</td></tr> - <tr><th>Variant ID</th><td>@{engineInstance.engineVariant}</td></tr> - <tr><th>Instance ID</th><td>@{engineInstance.id}</td></tr> - </table> - <h2>Server Information</h2> - <table class="table table-bordered table-striped"> - <tr><th>Start Time</th><td>@{DateTimeFormat.forStyle("FF").print(serverStartTime)}</td></tr> - <tr><th>Request Count</th><td>@{requestCount}</td></tr> - <tr><th>Average Serving Time</th><td>@{f"${avgServingSec}%.4f"} seconds</td></tr> - <tr><th>Last Serving Time</th><td>@{f"${lastServingSec}%.4f"} seconds</td></tr> - <tr><th>Engine Factory Class (Scala/Java)</th><td>@{engineInstance.engineFactory}</td></tr> - <tr> - <th rowspan="@(manifest.files.size)">Library Files</th> - <td>@{manifest.files.head}</td> - </tr> - @for(f <- manifest.files.drop(1)) { - <tr><td>@f</td></tr> - } - </table> - <h2>Data Source</h2> - <table class="table table-bordered table-striped"> - <tr><th>Parameters</th><td>@{dataSourceParams}</td></tr> - </table> - <h2>Data Preparator</h2> - <table class="table table-bordered table-striped"> - <tr><th>Parameters</th><td>@{preparatorParams}</td></tr> - </table> - <h2>Algorithms and Models</h2> - <table class="table table-bordered table-striped"> - <tr><th>#</th><th colspan="2">Information</th></tr> - @for(a <- algorithms.zip(algorithmsParams).zip(models).zipWithIndex) { - <tr> - <th rowspan="3">@{a._2 + 1}</th> - <th>Class</th><td>@{a._1._1._1}</td> - </tr> - <tr><th>Parameters</th><td>@{a._1._1._2}</td></tr> - <tr><th>Model</th><td>@{a._1._2}</td></tr> - } - </table> - <h2>Serving</h2> - <table class="table table-bordered table-striped"> - <tr><th>Parameters</th><td>@{servingParams}</td></tr> - </table> - <h2>Feedback Loop Information</h2> - <table class="table table-bordered table-striped"> - <tr><th>Feedback Loop Enabled?</th><td>@{feedback}</td></tr> - <tr><th>Event Server IP</th><td>@{eventServerIp}</td></tr> - <tr><th>Event Server Port</th><td>@{eventServerPort}</td></tr> - </table> - </div> - </body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/twirl/org/apache/predictionio/controller/metric_evaluator.scala.html ---------------------------------------------------------------------- diff --git a/core/src/main/twirl/org/apache/predictionio/controller/metric_evaluator.scala.html b/core/src/main/twirl/org/apache/predictionio/controller/metric_evaluator.scala.html new file mode 100644 index 0000000..2e679a5 --- /dev/null +++ b/core/src/main/twirl/org/apache/predictionio/controller/metric_evaluator.scala.html @@ -0,0 +1,95 @@ +<html> + <head> + <script type='text/javascript' src='https://www.google.com/jsapi'></script> + <script src="http://ajax.googleapis.com/ajax/libs/jquery/2.1.1/jquery.min.js"></script> + <script> + google.load('visualization', '1', {packages:['table', 'corechart',]}); + </script> + </head> + <body> + <h1>Metric Evaluator</h1> + <div id='debug'></div> + <div id='table'> + <h3>Engine Params Evaluation Results</h3> + <div>Click on table to view the engine params</div> + </div> + <pre id='engineParams'></div> + <script type='text/javascript'> + google.setOnLoadCallback(load); + + //var url = 'http://localhost:9000/engine_instances/ky01Q-glQheNE_s885JTSg/local_evaluator_results.json'; + var url = 'evaluator_results.json'; + var rawData; + var metricHeader; + var otherMetricHeaders; + var engineParamsScores; + var table; + var dataTable; + + function load() { + rawData = JSON.parse( + jQuery.ajax({ + url: url, + dataType: 'json', + async: false, + }).responseText); + + metricHeader = rawData['metricHeader']; + otherMetricHeaders = rawData['otherMetricHeaders']; + engineParamsScores = rawData['engineParamsScores']; + + drawTable(); + } + + function tableSelectHandler() { + var selection = table.getSelection(); + if (selection.length > 0) { + var row = selection[0].row; + var idx = dataTable.getValue(row, 0); + var engineParams = engineParamsScores[idx]._1; + + document.getElementById('engineParams').innerHTML = JSON.stringify( + engineParams, null, 2); + } else { + document.getElementById('engineParams').innerHTML = ""; + } + } + + function drawTable() { + var tableDiv = document.createElement('div'); + document.getElementById('table').appendChild(tableDiv); + + + var dataArray = []; + + var headers = ['Index', 'Best', metricHeader].concat(otherMetricHeaders); + dataArray.push(headers); + + for (epIdx = 0; epIdx < engineParamsScores.length; epIdx++) { + var epScore = engineParamsScores[epIdx]; + var isBest = (epIdx == rawData.bestIdx ? "*" : ""); + dataArray.push([epIdx, isBest, epScore._2.score].concat(epScore._2.otherScores)); + } + + dataTable = google.visualization.arrayToDataTable(dataArray, false); + + // formatter + var numberFormatter = new google.visualization.NumberFormat({fractionDigits: 4}); + + for (colIdx = 1; colIdx < dataTable.getNumberOfColumns(); colIdx++) { + if (dataTable.getColumnType(colIdx) == "number") { + numberFormatter.format(dataTable, colIdx); + } + } + + table = new google.visualization.Table(tableDiv); + + // select handler + google.visualization.events.addListener(table, 'select', tableSelectHandler); + + table.draw(dataTable); + } + + </script> + </body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html ---------------------------------------------------------------------- diff --git a/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html b/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html new file mode 100644 index 0000000..4e0707b --- /dev/null +++ b/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html @@ -0,0 +1,92 @@ +@import io.prediction.data.storage.EngineInstance +@import io.prediction.data.storage.EngineManifest +@import io.prediction.workflow.ServerConfig +@import org.joda.time.DateTime +@import org.joda.time.format.DateTimeFormat +@(args: ServerConfig, + manifest: EngineManifest, + engineInstance: EngineInstance, + algorithms: Seq[String], + algorithmsParams: Seq[String], + models: Seq[String], + dataSourceParams: String, + preparatorParams: String, + servingParams: String, + serverStartTime: DateTime, + feedback: Boolean, + eventServerIp: String, + eventServerPort: Int, + requestCount: Int, + avgServingSec: Double, + lastServingSec: Double + ) +<!DOCTYPE html> +<html lang="en"> + <head> + <title>@{engineInstance.engineFactory} (@{engineInstance.engineVariant}) - PredictionIO Engine Server at @{args.ip}:@{args.port}</title> + <link href="/assets/bootstrap-3.2.0-dist/css/bootstrap.min.css" rel="stylesheet"> + <style type="text/css"> + td { font-family: Menlo, Monaco, Consolas, "Courier New", monospace; } + </style> + </head> + <body> + <div class="container"> + <div class="page-header"> + <h1>PredictionIO Engine Server at @{args.ip}:@{args.port}</h1> + <p class="lead">@{engineInstance.engineFactory} (@{engineInstance.engineVariant})</p> + </div> + <h2>Engine Information</h2> + <table class="table table-bordered table-striped"> + <tr><th>Training Start Time</th><td>@{DateTimeFormat.forStyle("FF").print(engineInstance.startTime)}</td></tr> + <tr><th>Training End Time</th><td>@{DateTimeFormat.forStyle("FF").print(engineInstance.endTime)}</td></tr> + <tr><th>Variant ID</th><td>@{engineInstance.engineVariant}</td></tr> + <tr><th>Instance ID</th><td>@{engineInstance.id}</td></tr> + </table> + <h2>Server Information</h2> + <table class="table table-bordered table-striped"> + <tr><th>Start Time</th><td>@{DateTimeFormat.forStyle("FF").print(serverStartTime)}</td></tr> + <tr><th>Request Count</th><td>@{requestCount}</td></tr> + <tr><th>Average Serving Time</th><td>@{f"${avgServingSec}%.4f"} seconds</td></tr> + <tr><th>Last Serving Time</th><td>@{f"${lastServingSec}%.4f"} seconds</td></tr> + <tr><th>Engine Factory Class (Scala/Java)</th><td>@{engineInstance.engineFactory}</td></tr> + <tr> + <th rowspan="@(manifest.files.size)">Library Files</th> + <td>@{manifest.files.head}</td> + </tr> + @for(f <- manifest.files.drop(1)) { + <tr><td>@f</td></tr> + } + </table> + <h2>Data Source</h2> + <table class="table table-bordered table-striped"> + <tr><th>Parameters</th><td>@{dataSourceParams}</td></tr> + </table> + <h2>Data Preparator</h2> + <table class="table table-bordered table-striped"> + <tr><th>Parameters</th><td>@{preparatorParams}</td></tr> + </table> + <h2>Algorithms and Models</h2> + <table class="table table-bordered table-striped"> + <tr><th>#</th><th colspan="2">Information</th></tr> + @for(a <- algorithms.zip(algorithmsParams).zip(models).zipWithIndex) { + <tr> + <th rowspan="3">@{a._2 + 1}</th> + <th>Class</th><td>@{a._1._1._1}</td> + </tr> + <tr><th>Parameters</th><td>@{a._1._1._2}</td></tr> + <tr><th>Model</th><td>@{a._1._2}</td></tr> + } + </table> + <h2>Serving</h2> + <table class="table table-bordered table-striped"> + <tr><th>Parameters</th><td>@{servingParams}</td></tr> + </table> + <h2>Feedback Loop Information</h2> + <table class="table table-bordered table-striped"> + <tr><th>Feedback Loop Enabled?</th><td>@{feedback}</td></tr> + <tr><th>Event Server IP</th><td>@{eventServerIp}</td></tr> + <tr><th>Event Server Port</th><td>@{eventServerPort}</td></tr> + </table> + </div> + </body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/java/io/prediction/workflow/JavaParams.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/io/prediction/workflow/JavaParams.java b/core/src/test/java/io/prediction/workflow/JavaParams.java deleted file mode 100644 index 65108b5..0000000 --- a/core/src/test/java/io/prediction/workflow/JavaParams.java +++ /dev/null @@ -1,30 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.workflow; - -import io.prediction.controller.Params; - -public class JavaParams implements Params { - private final String p; - - public JavaParams(String p) { - this.p = p; - } - - public String getP() { - return p; - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/java/io/prediction/workflow/JavaQuery.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/io/prediction/workflow/JavaQuery.java b/core/src/test/java/io/prediction/workflow/JavaQuery.java deleted file mode 100644 index 1630a2d..0000000 --- a/core/src/test/java/io/prediction/workflow/JavaQuery.java +++ /dev/null @@ -1,46 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.workflow; - -import java.io.Serializable; - -public class JavaQuery implements Serializable{ - private final String q; - - public JavaQuery(String q) { - this.q = q; - } - - public String getQ() { - return q; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - JavaQuery javaQuery = (JavaQuery) o; - - return !(q != null ? !q.equals(javaQuery.q) : javaQuery.q != null); - - } - - @Override - public int hashCode() { - return q != null ? q.hashCode() : 0; - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/java/io/prediction/workflow/JavaQueryTypeAdapterFactory.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/io/prediction/workflow/JavaQueryTypeAdapterFactory.java b/core/src/test/java/io/prediction/workflow/JavaQueryTypeAdapterFactory.java deleted file mode 100644 index 409859d..0000000 --- a/core/src/test/java/io/prediction/workflow/JavaQueryTypeAdapterFactory.java +++ /dev/null @@ -1,60 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.workflow; - -import com.google.gson.Gson; -import com.google.gson.TypeAdapter; -import com.google.gson.TypeAdapterFactory; -import com.google.gson.reflect.TypeToken; -import com.google.gson.stream.JsonReader; -import com.google.gson.stream.JsonToken; -import com.google.gson.stream.JsonWriter; - -import java.io.IOException; - -public class JavaQueryTypeAdapterFactory implements TypeAdapterFactory { - @Override - public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) { - if (type.getRawType().equals(JavaQuery.class)) { - return (TypeAdapter<T>) new TypeAdapter<JavaQuery>() { - public void write(JsonWriter out, JavaQuery value) throws IOException { - if (value == null) { - out.nullValue(); - } else { - out.beginObject(); - out.name("q").value(value.getQ().toUpperCase()); - out.endObject(); - } - } - - public JavaQuery read(JsonReader reader) throws IOException { - if (reader.peek() == JsonToken.NULL) { - reader.nextNull(); - return null; - } else { - reader.beginObject(); - reader.nextName(); - String q = reader.nextString(); - reader.endObject(); - return new JavaQuery(q.toUpperCase()); - } - } - }; - } else { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/java/org/apache/predictionio/workflow/JavaParams.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/predictionio/workflow/JavaParams.java b/core/src/test/java/org/apache/predictionio/workflow/JavaParams.java new file mode 100644 index 0000000..982ecbf --- /dev/null +++ b/core/src/test/java/org/apache/predictionio/workflow/JavaParams.java @@ -0,0 +1,30 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.workflow; + +import org.apache.predictionio.controller.Params; + +public class JavaParams implements Params { + private final String p; + + public JavaParams(String p) { + this.p = p; + } + + public String getP() { + return p; + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/java/org/apache/predictionio/workflow/JavaQuery.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/predictionio/workflow/JavaQuery.java b/core/src/test/java/org/apache/predictionio/workflow/JavaQuery.java new file mode 100644 index 0000000..f4a6359 --- /dev/null +++ b/core/src/test/java/org/apache/predictionio/workflow/JavaQuery.java @@ -0,0 +1,46 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.workflow; + +import java.io.Serializable; + +public class JavaQuery implements Serializable{ + private final String q; + + public JavaQuery(String q) { + this.q = q; + } + + public String getQ() { + return q; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + JavaQuery javaQuery = (JavaQuery) o; + + return !(q != null ? !q.equals(javaQuery.q) : javaQuery.q != null); + + } + + @Override + public int hashCode() { + return q != null ? q.hashCode() : 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/java/org/apache/predictionio/workflow/JavaQueryTypeAdapterFactory.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/predictionio/workflow/JavaQueryTypeAdapterFactory.java b/core/src/test/java/org/apache/predictionio/workflow/JavaQueryTypeAdapterFactory.java new file mode 100644 index 0000000..46854d6 --- /dev/null +++ b/core/src/test/java/org/apache/predictionio/workflow/JavaQueryTypeAdapterFactory.java @@ -0,0 +1,60 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.workflow; + +import com.google.gson.Gson; +import com.google.gson.TypeAdapter; +import com.google.gson.TypeAdapterFactory; +import com.google.gson.reflect.TypeToken; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonToken; +import com.google.gson.stream.JsonWriter; + +import java.io.IOException; + +public class JavaQueryTypeAdapterFactory implements TypeAdapterFactory { + @Override + public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) { + if (type.getRawType().equals(JavaQuery.class)) { + return (TypeAdapter<T>) new TypeAdapter<JavaQuery>() { + public void write(JsonWriter out, JavaQuery value) throws IOException { + if (value == null) { + out.nullValue(); + } else { + out.beginObject(); + out.name("q").value(value.getQ().toUpperCase()); + out.endObject(); + } + } + + public JavaQuery read(JsonReader reader) throws IOException { + if (reader.peek() == JsonToken.NULL) { + reader.nextNull(); + return null; + } else { + reader.beginObject(); + reader.nextName(); + String q = reader.nextString(); + reader.endObject(); + return new JavaQuery(q.toUpperCase()); + } + } + }; + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/controller/EngineTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/prediction/controller/EngineTest.scala b/core/src/test/scala/io/prediction/controller/EngineTest.scala deleted file mode 100644 index cc84249..0000000 --- a/core/src/test/scala/io/prediction/controller/EngineTest.scala +++ /dev/null @@ -1,615 +0,0 @@ -package io.prediction.controller - -import io.prediction.workflow.PersistentModelManifest -import io.prediction.workflow.SharedSparkContext -import io.prediction.workflow.StopAfterPrepareInterruption -import io.prediction.workflow.StopAfterReadInterruption - -import grizzled.slf4j.Logger -import io.prediction.workflow.WorkflowParams -import org.apache.spark.rdd.RDD -import org.scalatest.Inspectors._ -import org.scalatest.Matchers._ -import org.scalatest.FunSuite -import org.scalatest.Inside - -import scala.util.Random - -class EngineSuite -extends FunSuite with Inside with SharedSparkContext { - import io.prediction.controller.Engine0._ - @transient lazy val logger = Logger[this.type] - - test("Engine.train") { - val engine = new Engine( - classOf[PDataSource2], - classOf[PPreparator1], - Map("" -> classOf[PAlgo2]), - classOf[LServing1]) - - val engineParams = EngineParams( - dataSourceParams = PDataSource2.Params(0), - preparatorParams = PPreparator1.Params(1), - algorithmParamsList = Seq(("", PAlgo2.Params(2))), - servingParams = LServing1.Params(3)) - - val models = engine.train( - sc, - engineParams, - engineInstanceId = "", - params = WorkflowParams()) - - val pd = ProcessedData(1, TrainingData(0)) - - // PAlgo2.Model doesn't have IPersistentModel trait implemented. Hence the - // model extract after train is Unit. - models should contain theSameElementsAs Seq(Unit) - } - - test("Engine.train persisting PAlgo.Model") { - val engine = new Engine( - classOf[PDataSource2], - classOf[PPreparator1], - Map( - "PAlgo2" -> classOf[PAlgo2], - "PAlgo3" -> classOf[PAlgo3] - ), - classOf[LServing1]) - - val engineParams = EngineParams( - dataSourceParams = PDataSource2.Params(0), - preparatorParams = PPreparator1.Params(1), - algorithmParamsList = Seq( - ("PAlgo2", PAlgo2.Params(2)), - ("PAlgo3", PAlgo3.Params(21)), - ("PAlgo3", PAlgo3.Params(22)) - ), - servingParams = LServing1.Params(3)) - - val pd = ProcessedData(1, TrainingData(0)) - val model21 = PAlgo3.Model(21, pd) - val model22 = PAlgo3.Model(22, pd) - - val models = engine.train( - sc, - engineParams, - engineInstanceId = "", - params = WorkflowParams()) - - val pModel21 = PersistentModelManifest(model21.getClass.getName) - val pModel22 = PersistentModelManifest(model22.getClass.getName) - - models should contain theSameElementsAs Seq(Unit, pModel21, pModel22) - } - - test("Engine.train persisting LAlgo.Model") { - val engine = Engine( - classOf[LDataSource1], - classOf[LPreparator1], - Map( - "LAlgo1" -> classOf[LAlgo1], - "LAlgo2" -> classOf[LAlgo2], - "LAlgo3" -> classOf[LAlgo3] - ), - classOf[LServing1]) - - val engineParams = EngineParams( - dataSourceParams = LDataSource1.Params(0), - preparatorParams = LPreparator1.Params(1), - algorithmParamsList = Seq( - ("LAlgo2", LAlgo2.Params(20)), - ("LAlgo2", LAlgo2.Params(21)), - ("LAlgo3", LAlgo3.Params(22))), - servingParams = LServing1.Params(3)) - - val pd = ProcessedData(1, TrainingData(0)) - val model20 = LAlgo2.Model(20, pd) - val model21 = LAlgo2.Model(21, pd) - val model22 = LAlgo3.Model(22, pd) - - //val models = engine.train(sc, engineParams, WorkflowParams()) - val models = engine.train( - sc, - engineParams, - engineInstanceId = "", - params = WorkflowParams()) - - val pModel20 = PersistentModelManifest(model20.getClass.getName) - val pModel21 = PersistentModelManifest(model21.getClass.getName) - - models should contain theSameElementsAs Seq(pModel20, pModel21, model22) - } - - test("Engine.train persisting P&NAlgo.Model") { - val engine = new Engine( - classOf[PDataSource2], - classOf[PPreparator1], - Map( - "PAlgo2" -> classOf[PAlgo2], - "PAlgo3" -> classOf[PAlgo3], - "NAlgo2" -> classOf[NAlgo2], - "NAlgo3" -> classOf[NAlgo3] - ), - classOf[LServing1]) - - val engineParams = EngineParams( - dataSourceParams = PDataSource2.Params(0), - preparatorParams = PPreparator1.Params(1), - algorithmParamsList = Seq( - ("PAlgo2", PAlgo2.Params(20)), - ("PAlgo3", PAlgo3.Params(21)), - ("PAlgo3", PAlgo3.Params(22)), - ("NAlgo2", NAlgo2.Params(23)), - ("NAlgo3", NAlgo3.Params(24)), - ("NAlgo3", NAlgo3.Params(25)) - ), - servingParams = LServing1.Params(3)) - - val pd = ProcessedData(1, TrainingData(0)) - val model21 = PAlgo3.Model(21, pd) - val model22 = PAlgo3.Model(22, pd) - val model23 = NAlgo2.Model(23, pd) - val model24 = NAlgo3.Model(24, pd) - val model25 = NAlgo3.Model(25, pd) - - //val models = engine.train(sc, engineParams, WorkflowParams()) - val models = engine.train( - sc, - engineParams, - engineInstanceId = "", - params = WorkflowParams()) - - val pModel21 = PersistentModelManifest(model21.getClass.getName) - val pModel22 = PersistentModelManifest(model22.getClass.getName) - val pModel23 = PersistentModelManifest(model23.getClass.getName) - - models should contain theSameElementsAs Seq( - Unit, pModel21, pModel22, pModel23, model24, model25) - } - - test("Engine.eval") { - val engine = new Engine( - classOf[PDataSource2], - classOf[PPreparator1], - Map("" -> classOf[PAlgo2]), - classOf[LServing1]) - - val qn = 10 - val en = 3 - - val engineParams = EngineParams( - dataSourceParams = PDataSource2.Params(id = 0, en = en, qn = qn), - preparatorParams = PPreparator1.Params(1), - algorithmParamsList = Seq(("", PAlgo2.Params(2))), - servingParams = LServing1.Params(3)) - - val algoCount = engineParams.algorithmParamsList.size - val pd = ProcessedData(1, TrainingData(0)) - val model0 = PAlgo2.Model(2, pd) - - val evalDataSet = engine.eval(sc, engineParams, WorkflowParams()) - - evalDataSet should have size en - - forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => { - val (evalInfo, qpaRDD) = evalData - evalInfo shouldBe EvalInfo(0) - - val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect - - qpaSeq should have size qn - - forAll (qpaSeq) { case (q, p, a) => - val Query(qId, qEx, qQx, _) = q - val Actual(aId, aEx, aQx) = a - qId shouldBe aId - qEx shouldBe ex - aEx shouldBe ex - qQx shouldBe aQx - - inside (p) { case Prediction(pId, pQ, pModels, pPs) => { - pId shouldBe 3 - pQ shouldBe q - pModels shouldBe None - pPs should have size algoCount - pPs shouldBe Seq( - Prediction(id = 2, q = q, models = Some(model0))) - }} - } - }} - } - - test("Engine.prepareDeploy PAlgo") { - val engine = new Engine( - classOf[PDataSource2], - classOf[PPreparator1], - Map( - "PAlgo2" -> classOf[PAlgo2], - "PAlgo3" -> classOf[PAlgo3], - "NAlgo2" -> classOf[NAlgo2], - "NAlgo3" -> classOf[NAlgo3] - ), - classOf[LServing1]) - - val engineParams = EngineParams( - dataSourceParams = PDataSource2.Params(0), - preparatorParams = PPreparator1.Params(1), - algorithmParamsList = Seq( - ("PAlgo2", PAlgo2.Params(20)), - ("PAlgo3", PAlgo3.Params(21)), - ("PAlgo3", PAlgo3.Params(22)), - ("NAlgo2", NAlgo2.Params(23)), - ("NAlgo3", NAlgo3.Params(24)), - ("NAlgo3", NAlgo3.Params(25)) - ), - servingParams = LServing1.Params(3)) - - val pd = ProcessedData(1, TrainingData(0)) - val model20 = PAlgo2.Model(20, pd) - val model21 = PAlgo3.Model(21, pd) - val model22 = PAlgo3.Model(22, pd) - val model23 = NAlgo2.Model(23, pd) - val model24 = NAlgo3.Model(24, pd) - val model25 = NAlgo3.Model(25, pd) - - val rand = new Random() - - val fakeEngineInstanceId = s"FakeInstanceId-${rand.nextLong()}" - - val persistedModels = engine.train( - sc, - engineParams, - engineInstanceId = fakeEngineInstanceId, - params = WorkflowParams() - ) - - val deployableModels = engine.prepareDeploy( - sc, - engineParams, - fakeEngineInstanceId, - persistedModels, - params = WorkflowParams() - ) - - deployableModels should contain theSameElementsAs Seq( - model20, model21, model22, model23, model24, model25) - } -} - -class EngineTrainSuite extends FunSuite with SharedSparkContext { - import io.prediction.controller.Engine0._ - val defaultWorkflowParams: WorkflowParams = WorkflowParams() - - test("Parallel DS/P/Algos") { - val models = Engine.train( - sc, - new PDataSource0(0), - new PPreparator0(1), - Seq( - new PAlgo0(2), - new PAlgo1(3), - new PAlgo0(4)), - defaultWorkflowParams - ) - - val pd = ProcessedData(1, TrainingData(0)) - - models should contain theSameElementsAs Seq( - PAlgo0.Model(2, pd), PAlgo1.Model(3, pd), PAlgo0.Model(4, pd)) - } - - test("Local DS/P/Algos") { - val models = Engine.train( - sc, - new LDataSource0(0), - new LPreparator0(1), - Seq( - new LAlgo0(2), - new LAlgo1(3), - new LAlgo0(4)), - defaultWorkflowParams - ) - - val pd = ProcessedData(1, TrainingData(0)) - - val expectedResults = Seq( - LAlgo0.Model(2, pd), - LAlgo1.Model(3, pd), - LAlgo0.Model(4, pd)) - - forAll(models.zip(expectedResults)) { case (model, expected) => - model shouldBe a [RDD[_]] - val localModel = model.asInstanceOf[RDD[_]].collect - localModel should contain theSameElementsAs Seq(expected) - } - } - - test("P2L DS/P/Algos") { - val models = Engine.train( - sc, - new PDataSource0(0), - new PPreparator0(1), - Seq( - new NAlgo0(2), - new NAlgo1(3), - new NAlgo0(4)), - defaultWorkflowParams - ) - - val pd = ProcessedData(1, TrainingData(0)) - - models should contain theSameElementsAs Seq( - NAlgo0.Model(2, pd), NAlgo1.Model(3, pd), NAlgo0.Model(4, pd)) - } - - test("Parallel DS/P/Algos Stop-After-Read") { - val workflowParams = defaultWorkflowParams.copy( - stopAfterRead = true) - - an [StopAfterReadInterruption] should be thrownBy Engine.train( - sc, - new PDataSource0(0), - new PPreparator0(1), - Seq( - new PAlgo0(2), - new PAlgo1(3), - new PAlgo0(4)), - workflowParams - ) - } - - test("Parallel DS/P/Algos Stop-After-Prepare") { - val workflowParams = defaultWorkflowParams.copy( - stopAfterPrepare = true) - - an [StopAfterPrepareInterruption] should be thrownBy Engine.train( - sc, - new PDataSource0(0), - new PPreparator0(1), - Seq( - new PAlgo0(2), - new PAlgo1(3), - new PAlgo0(4)), - workflowParams - ) - } - - test("Parallel DS/P/Algos Dirty TrainingData") { - val workflowParams = defaultWorkflowParams.copy( - skipSanityCheck = false) - - an [AssertionError] should be thrownBy Engine.train( - sc, - new PDataSource3(0, error = true), - new PPreparator0(1), - Seq( - new PAlgo0(2), - new PAlgo1(3), - new PAlgo0(4)), - workflowParams - ) - } - - test("Parallel DS/P/Algos Dirty TrainingData But Skip Check") { - val workflowParams = defaultWorkflowParams.copy( - skipSanityCheck = true) - - val models = Engine.train( - sc, - new PDataSource3(0, error = true), - new PPreparator0(1), - Seq( - new PAlgo0(2), - new PAlgo1(3), - new PAlgo0(4)), - workflowParams - ) - - val pd = ProcessedData(1, TrainingData(0, error = true)) - - models should contain theSameElementsAs Seq( - PAlgo0.Model(2, pd), PAlgo1.Model(3, pd), PAlgo0.Model(4, pd)) - } -} - - -class EngineEvalSuite -extends FunSuite with Inside with SharedSparkContext { - import io.prediction.controller.Engine0._ - - @transient lazy val logger = Logger[this.type] - - test("Simple Parallel DS/P/A/S") { - val en = 2 - val qn = 5 - - val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = - Engine.eval( - sc, - new PDataSource1(id = 1, en = en, qn = qn), - new PPreparator0(id = 2), - Seq(new PAlgo0(id = 3)), - new LServing0(id = 10)) - - val pd = ProcessedData(2, TrainingData(1)) - val model0 = PAlgo0.Model(3, pd) - - forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => { - val (evalInfo, qpaRDD) = evalData - evalInfo shouldBe EvalInfo(1) - - val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect - forAll (qpaSeq) { case (q, p, a) => - val Query(qId, qEx, qQx, _) = q - val Actual(aId, aEx, aQx) = a - qId shouldBe aId - qEx shouldBe ex - aEx shouldBe ex - qQx shouldBe aQx - - inside (p) { case Prediction(pId, pQ, pModels, pPs) => { - pId shouldBe 10 - pQ shouldBe q - pModels shouldBe None - pPs should have size 1 - pPs shouldBe Seq( - Prediction(id = 3, q = q, models = Some(model0))) - }} - } - - }} - - } - - test("Parallel DS/P/A/S") { - val en = 2 - val qn = 5 - - val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = - Engine.eval( - sc, - new PDataSource1(id = 1, en = en, qn = qn), - new PPreparator0(id = 2), - Seq( - new PAlgo0(id = 3), - new PAlgo1(id = 4), - new NAlgo1(id = 5)), - new LServing0(id = 10)) - - val pd = ProcessedData(2, TrainingData(1)) - val model0 = PAlgo0.Model(3, pd) - val model1 = PAlgo1.Model(4, pd) - val model2 = NAlgo1.Model(5, pd) - - forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => { - val (evalInfo, qpaRDD) = evalData - evalInfo shouldBe EvalInfo(1) - - val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect - forAll (qpaSeq) { case (q, p, a) => - val Query(qId, qEx, qQx, _) = q - val Actual(aId, aEx, aQx) = a - qId shouldBe aId - qEx shouldBe ex - aEx shouldBe ex - qQx shouldBe aQx - - inside (p) { case Prediction(pId, pQ, pModels, pPs) => { - pId shouldBe 10 - pQ shouldBe q - pModels shouldBe None - pPs should have size 3 - pPs shouldBe Seq( - Prediction(id = 3, q = q, models = Some(model0)), - Prediction(id = 4, q = q, models = Some(model1)), - Prediction(id = 5, q = q, models = Some(model2)) - ) - }} - } - }} - } - - test("Parallel DS/P/A/S with Supplemented Query") { - val en = 2 - val qn = 5 - - val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = - Engine.eval( - sc, - new PDataSource1(id = 1, en = en, qn = qn), - new PPreparator0(id = 2), - Seq( - new PAlgo0(id = 3), - new PAlgo1(id = 4), - new NAlgo1(id = 5)), - new LServing2(id = 10)) - - val pd = ProcessedData(2, TrainingData(1)) - val model0 = PAlgo0.Model(3, pd) - val model1 = PAlgo1.Model(4, pd) - val model2 = NAlgo1.Model(5, pd) - - forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => { - val (evalInfo, qpaRDD) = evalData - evalInfo shouldBe EvalInfo(1) - - val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect - forAll (qpaSeq) { case (q, p, a) => - val Query(qId, qEx, qQx, qSupp) = q - val Actual(aId, aEx, aQx) = a - qId shouldBe aId - qEx shouldBe ex - aEx shouldBe ex - qQx shouldBe aQx - qSupp shouldBe false - - inside (p) { case Prediction(pId, pQ, pModels, pPs) => { - pId shouldBe 10 - pQ shouldBe q - pModels shouldBe None - pPs should have size 3 - // queries inside prediction should have supp set to true, since it - // represents what the algorithms see. - val qSupp = q.copy(supp = true) - pPs shouldBe Seq( - Prediction(id = 3, q = qSupp, models = Some(model0)), - Prediction(id = 4, q = qSupp, models = Some(model1)), - Prediction(id = 5, q = qSupp, models = Some(model2)) - ) - }} - } - }} - } - - test("Local DS/P/A/S") { - val en = 2 - val qn = 5 - - val evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = - Engine.eval( - sc, - new LDataSource0(id = 1, en = en, qn = qn), - new LPreparator0(id = 2), - Seq( - new LAlgo0(id = 3), - new LAlgo1(id = 4), - new LAlgo1(id = 5)), - new LServing0(id = 10)) - - val pd = ProcessedData(2, TrainingData(1)) - val model0 = LAlgo0.Model(3, pd) - val model1 = LAlgo1.Model(4, pd) - val model2 = LAlgo1.Model(5, pd) - - forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => { - val (evalInfo, qpaRDD) = evalData - evalInfo shouldBe EvalInfo(1) - - val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect - forAll (qpaSeq) { case (q, p, a) => - val Query(qId, qEx, qQx, _) = q - val Actual(aId, aEx, aQx) = a - qId shouldBe aId - qEx shouldBe ex - aEx shouldBe ex - qQx shouldBe aQx - - inside (p) { case Prediction(pId, pQ, pModels, pPs) => { - pId shouldBe 10 - pQ shouldBe q - pModels shouldBe None - pPs should have size 3 - pPs shouldBe Seq( - Prediction(id = 3, q = q, models = Some(model0)), - Prediction(id = 4, q = q, models = Some(model1)), - Prediction(id = 5, q = q, models = Some(model2)) - ) - }} - } - - }} - - } -} - - http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/controller/EvaluationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/prediction/controller/EvaluationTest.scala b/core/src/test/scala/io/prediction/controller/EvaluationTest.scala deleted file mode 100644 index 5dc4c86..0000000 --- a/core/src/test/scala/io/prediction/controller/EvaluationTest.scala +++ /dev/null @@ -1,46 +0,0 @@ -package io.prediction.controller - -import io.prediction.workflow.SharedSparkContext - -import org.scalatest.FunSuite -import org.scalatest.Inside -import org.scalatest.Matchers._ - -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD - -object EvaluationSuite { - import io.prediction.controller.TestEvaluator._ - - class Metric0 extends Metric[EvalInfo, Query, Prediction, Actual, Int] { - def calculate( - sc: SparkContext, - evalDataSet: Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])]): Int = 1 - } - - object Evaluation0 extends Evaluation { - engineMetric = (new FakeEngine(1, 1, 1), new Metric0()) - } -} - - -class EvaluationSuite -extends FunSuite with Inside with SharedSparkContext { - import io.prediction.controller.EvaluationSuite._ - - test("Evaluation makes MetricEvaluator") { - // MetricEvaluator is typed [EvalInfo, Query, Prediction, Actual, Int], - // however this information is erased on JVM. scalatest doc recommends to - // use wildcards. - Evaluation0.evaluator shouldBe a [MetricEvaluator[_, _, _, _, _]] - } - - test("Load from class path") { - val r = io.prediction.workflow.WorkflowUtils.getEvaluation( - "io.prediction.controller.EvaluationSuite.Evaluation0", - getClass.getClassLoader) - - r._2 shouldBe EvaluationSuite.Evaluation0 - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/controller/EvaluatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/prediction/controller/EvaluatorTest.scala b/core/src/test/scala/io/prediction/controller/EvaluatorTest.scala deleted file mode 100644 index c57bd03..0000000 --- a/core/src/test/scala/io/prediction/controller/EvaluatorTest.scala +++ /dev/null @@ -1,93 +0,0 @@ -package io.prediction.controller - -import io.prediction.core._ -import io.prediction.workflow.WorkflowParams - -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD - -object TestEvaluator { - case class EvalInfo(id: Int, ex: Int) - case class Query(id: Int, ex: Int, qx: Int) - case class Prediction(id: Int, ex: Int, qx: Int) - case class Actual(id: Int, ex: Int, qx: Int) - - class FakeEngine(val id: Int, val en: Int, val qn: Int) - extends BaseEngine[EvalInfo, Query, Prediction, Actual] { - def train( - sc: SparkContext, - engineParams: EngineParams, - instanceId: String = "", - params: WorkflowParams = WorkflowParams() - ): Seq[Any] = { - Seq[Any]() - } - - def eval( - sc: SparkContext, - engineParams: EngineParams, - params: WorkflowParams) - : Seq[(EvalInfo, RDD[(Query, Prediction, Actual)])] = { - (0 until en).map { ex => { - val qpas = (0 until qn).map { qx => { - (Query(id, ex, qx), Prediction(id, ex, qx), Actual(id, ex, qx)) - }} - - (EvalInfo(id = id, ex = ex), sc.parallelize(qpas)) - }} - } - - } - - /* - class Evaluator0 extends Evaluator[EvalInfo, Query, Prediction, Actual, - (Query, Prediction, Actual), - (EvalInfo, Seq[(Query, Prediction, Actual)]), - Seq[(EvalInfo, (EvalInfo, Seq[(Query, Prediction, Actual)]))] - ] { - - def evaluateUnit(q: Query, p: Prediction, a: Actual) - : (Query, Prediction, Actual) = (q, p, a) - - def evaluateSet( - evalInfo: EvalInfo, - eus: Seq[(Query, Prediction, Actual)]) - : (EvalInfo, Seq[(Query, Prediction, Actual)]) = (evalInfo, eus) - - def evaluateAll( - input: Seq[(EvalInfo, (EvalInfo, Seq[(Query, Prediction, Actual)]))]) - = input - } - */ - -} - -/* -class EvaluatorSuite -extends FunSuite with Inside with SharedSparkContext { - import io.prediction.controller.TestEvaluator._ - @transient lazy val logger = Logger[this.type] - - test("Evaluator.evaluate") { - val engine = new FakeEngine(1, 3, 10) - val evaluator = new Evaluator0() - - val evalDataSet = engine.eval(sc, null.asInstanceOf[EngineParams]) - val er: Seq[(EvalInfo, (EvalInfo, Seq[(Query, Prediction, Actual)]))] = - evaluator.evaluateBase(sc, evalDataSet) - - evalDataSet.zip(er).map { case (input, output) => { - val (inputEvalInfo, inputQpaRDD) = input - val (outputEvalInfo, (outputEvalInfo2, outputQpaSeq)) = output - - inputEvalInfo shouldBe outputEvalInfo - inputEvalInfo shouldBe outputEvalInfo2 - - val inputQpaSeq: Array[(Query, Prediction, Actual)] = inputQpaRDD.collect - - inputQpaSeq.size should be (outputQpaSeq.size) - // TODO. match inputQpa and outputQpa content. - }} - } -} -*/ http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/controller/FastEvalEngineTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/prediction/controller/FastEvalEngineTest.scala b/core/src/test/scala/io/prediction/controller/FastEvalEngineTest.scala deleted file mode 100644 index cdf4598..0000000 --- a/core/src/test/scala/io/prediction/controller/FastEvalEngineTest.scala +++ /dev/null @@ -1,181 +0,0 @@ -package io.prediction.controller - -import io.prediction.workflow.WorkflowParams -import org.scalatest.FunSuite -import org.scalatest.Inside -import org.scalatest.Matchers._ -import org.scalatest.Inspectors._ - -import io.prediction.workflow.SharedSparkContext - -class FastEngineSuite -extends FunSuite with Inside with SharedSparkContext { - import io.prediction.controller.Engine0._ - - test("Single Evaluation") { - val engine = new FastEvalEngine( - Map("" -> classOf[PDataSource2]), - Map("" -> classOf[PPreparator1]), - Map( - "PAlgo2" -> classOf[PAlgo2], - "PAlgo3" -> classOf[PAlgo3] - ), - Map("" -> classOf[LServing1])) - - val qn = 10 - val en = 3 - - val engineParams = EngineParams( - dataSourceParams = PDataSource2.Params(id = 0, en = en, qn = qn), - preparatorParams = PPreparator1.Params(1), - algorithmParamsList = Seq( - ("PAlgo2", PAlgo2.Params(20)), - ("PAlgo2", PAlgo2.Params(21)), - ("PAlgo3", PAlgo3.Params(22)) - ), - servingParams = LServing1.Params(3)) - - val algoCount = engineParams.algorithmParamsList.size - val pd = ProcessedData(1, TrainingData(0)) - val model0 = PAlgo2.Model(20, pd) - val model1 = PAlgo2.Model(21, pd) - val model2 = PAlgo3.Model(22, pd) - - val evalDataSet = engine.eval(sc, engineParams, WorkflowParams()) - - evalDataSet should have size en - - forAll(evalDataSet.zipWithIndex) { case (evalData, ex) => { - val (evalInfo, qpaRDD) = evalData - evalInfo shouldBe EvalInfo(0) - - val qpaSeq: Seq[(Query, Prediction, Actual)] = qpaRDD.collect - - qpaSeq should have size qn - - forAll (qpaSeq) { case (q, p, a) => - val Query(qId, qEx, qQx, _) = q - val Actual(aId, aEx, aQx) = a - qId shouldBe aId - qEx shouldBe ex - aEx shouldBe ex - qQx shouldBe aQx - - inside (p) { case Prediction(pId, pQ, pModels, pPs) => { - pId shouldBe 3 - pQ shouldBe q - pModels shouldBe None - pPs should have size algoCount - pPs shouldBe Seq( - Prediction(id = 20, q = q, models = Some(model0)), - Prediction(id = 21, q = q, models = Some(model1)), - Prediction(id = 22, q = q, models = Some(model2)) - ) - }} - } - }} - } - - test("Batch Evaluation") { - val engine = new FastEvalEngine( - Map("" -> classOf[PDataSource2]), - Map("" -> classOf[PPreparator1]), - Map("" -> classOf[PAlgo2]), - Map("" -> classOf[LServing1])) - - val qn = 10 - val en = 3 - - val baseEngineParams = EngineParams( - dataSourceParams = PDataSource2.Params(id = 0, en = en, qn = qn), - preparatorParams = PPreparator1.Params(1), - algorithmParamsList = Seq(("", PAlgo2.Params(2))), - servingParams = LServing1.Params(3)) - - val ep0 = baseEngineParams - val ep1 = baseEngineParams.copy( - algorithmParamsList = Seq(("", PAlgo2.Params(2)))) - val ep2 = baseEngineParams.copy( - algorithmParamsList = Seq(("", PAlgo2.Params(20)))) - - val engineEvalDataSet = engine.batchEval( - sc, - Seq(ep0, ep1, ep2), - WorkflowParams()) - - val evalDataSet0 = engineEvalDataSet(0)._2 - val evalDataSet1 = engineEvalDataSet(1)._2 - val evalDataSet2 = engineEvalDataSet(2)._2 - - evalDataSet0 shouldBe evalDataSet1 - evalDataSet0 should not be evalDataSet2 - evalDataSet1 should not be evalDataSet2 - - // evalDataSet0._1 should be theSameInstanceAs evalDataSet1._1 - // When things are cached correctly, evalDataSet0 and 1 should share the - // same EI - evalDataSet0.zip(evalDataSet1).foreach { case (e0, e1) => { - e0._1 should be theSameInstanceAs e1._1 - e0._2 should be theSameInstanceAs e1._2 - }} - - // So as set1 and set2, however, the QPA-RDD should be different. - evalDataSet1.zip(evalDataSet2).foreach { case (e1, e2) => { - e1._1 should be theSameInstanceAs e2._1 - val e1Qpa = e1._2 - val e2Qpa = e2._2 - e1Qpa should not be theSameInstanceAs (e2Qpa) - }} - } - - test("Not cached when isEqual not implemented") { - // PDataSource3.Params is a class not case class. Need to implement the - // isEqual function for hashing. - val engine = new FastEvalEngine( - Map("" -> classOf[PDataSource4]), - Map("" -> classOf[PPreparator1]), - Map("" -> classOf[PAlgo2]), - Map("" -> classOf[LServing1])) - - val qn = 10 - val en = 3 - - val baseEngineParams = EngineParams( - dataSourceParams = new PDataSource4.Params(id = 0, en = en, qn = qn), - preparatorParams = PPreparator1.Params(1), - algorithmParamsList = Seq(("", PAlgo2.Params(2))), - servingParams = LServing1.Params(3)) - - val ep0 = baseEngineParams - val ep1 = baseEngineParams.copy( - algorithmParamsList = Seq(("", PAlgo2.Params(3)))) - // ep2.dataSource is different from ep0. - val ep2 = baseEngineParams.copy( - dataSourceParams = ("", new PDataSource4.Params(id = 0, en = en, qn = qn)), - algorithmParamsList = Seq(("", PAlgo2.Params(3)))) - - val engineEvalDataSet = engine.batchEval( - sc, - Seq(ep0, ep1, ep2), - WorkflowParams()) - - val evalDataSet0 = engineEvalDataSet(0)._2 - val evalDataSet1 = engineEvalDataSet(1)._2 - val evalDataSet2 = engineEvalDataSet(2)._2 - - evalDataSet0 should not be evalDataSet1 - evalDataSet0 should not be evalDataSet2 - evalDataSet1 should not be evalDataSet2 - - // Set0 should have same EI as Set1, since their dsp are the same instance. - evalDataSet0.zip(evalDataSet1).foreach { case (e0, e1) => { - e0._1 should be theSameInstanceAs (e1._1) - }} - - // Set1 should have different EI as Set2, since Set2's dsp is another - // instance - evalDataSet1.zip(evalDataSet2).foreach { case (e1, e2) => { - e1._1 should not be theSameInstanceAs (e2._1) - }} - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/controller/MetricEvaluatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/prediction/controller/MetricEvaluatorTest.scala b/core/src/test/scala/io/prediction/controller/MetricEvaluatorTest.scala deleted file mode 100644 index 71fcb88..0000000 --- a/core/src/test/scala/io/prediction/controller/MetricEvaluatorTest.scala +++ /dev/null @@ -1,52 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import io.prediction.workflow.SharedSparkContext -import io.prediction.workflow.WorkflowParams -import org.scalatest.FunSuite - -object MetricEvaluatorSuite { - case class Metric0() extends SumMetric[EmptyParams, Int, Int, Int, Int] { - def calculate(q: Int, p: Int, a: Int): Int = q - } - - object Evaluation0 extends Evaluation {} -} - -class MetricEvaluatorDevSuite extends FunSuite with SharedSparkContext { - import io.prediction.controller.MetricEvaluatorSuite._ - - test("a") { - val metricEvaluator = MetricEvaluator( - Metric0(), - Seq(Metric0(), Metric0()) - ) - - val engineEvalDataSet = Seq( - (EngineParams(), Seq( - (EmptyParams(), sc.parallelize(Seq((1,0,0), (2,0,0)))))), - (EngineParams(), Seq( - (EmptyParams(), sc.parallelize(Seq((1,0,0), (2,0,0))))))) - - val r = metricEvaluator.evaluateBase( - sc, - Evaluation0, - engineEvalDataSet, - WorkflowParams()) - - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/test/scala/io/prediction/controller/MetricTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/io/prediction/controller/MetricTest.scala b/core/src/test/scala/io/prediction/controller/MetricTest.scala deleted file mode 100644 index b846548..0000000 --- a/core/src/test/scala/io/prediction/controller/MetricTest.scala +++ /dev/null @@ -1,143 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import io.prediction.workflow.SharedSparkContext - -import grizzled.slf4j.Logger -import org.scalatest.Matchers._ -import org.scalatest.FunSuite -import org.scalatest.Inside - -object MetricDevSuite { - class QIntSumMetric extends SumMetric[EmptyParams, Int, Int, Int, Int] { - def calculate(q: Int, p: Int, a: Int): Int = q - } - - class QDoubleSumMetric extends SumMetric[EmptyParams, Int, Int, Int, Double] { - def calculate(q: Int, p: Int, a: Int): Double = q.toDouble - } - - class QAverageMetric extends AverageMetric[EmptyParams, Int, Int, Int] { - def calculate(q: Int, p: Int, a: Int): Double = q.toDouble - } - - class QOptionAverageMetric extends OptionAverageMetric[EmptyParams, Int, Int, Int] { - def calculate(q: Int, p: Int, a: Int): Option[Double] = { - if (q < 0) { None } else { Some(q.toDouble) } - } - } - - class QStdevMetric extends StdevMetric[EmptyParams, Int, Int, Int] { - def calculate(q: Int, p: Int, a: Int): Double = q.toDouble - } - - class QOptionStdevMetric extends OptionStdevMetric[EmptyParams, Int, Int, Int] { - def calculate(q: Int, p: Int, a: Int): Option[Double] = { - if (q < 0) { None } else { Some(q.toDouble) } - } - } - -} - -class MetricDevSuite -extends FunSuite with Inside with SharedSparkContext { - @transient lazy val logger = Logger[this.type] - - test("Average Metric") { - val qpaSeq0 = Seq((1, 0, 0), (2, 0, 0), (3, 0, 0)) - val qpaSeq1 = Seq((4, 0, 0), (5, 0, 0), (6, 0, 0)) - - val evalDataSet = Seq( - (EmptyParams(), sc.parallelize(qpaSeq0)), - (EmptyParams(), sc.parallelize(qpaSeq1))) - - val m = new MetricDevSuite.QAverageMetric() - val result = m.calculate(sc, evalDataSet) - - result shouldBe (21.0 / 6) - } - - test("Option Average Metric") { - val qpaSeq0 = Seq((1, 0, 0), (2, 0, 0), (3, 0, 0)) - val qpaSeq1 = Seq((-4, 0, 0), (-5, 0, 0), (6, 0, 0)) - - val evalDataSet = Seq( - (EmptyParams(), sc.parallelize(qpaSeq0)), - (EmptyParams(), sc.parallelize(qpaSeq1))) - - val m = new MetricDevSuite.QOptionAverageMetric() - val result = m.calculate(sc, evalDataSet) - - result shouldBe (12.0 / 4) - } - - test("Stdev Metric") { - val qpaSeq0 = Seq((1, 0, 0), (1, 0, 0), (1, 0, 0), (1, 0, 0)) - val qpaSeq1 = Seq((5, 0, 0), (5, 0, 0), (5, 0, 0), (5, 0, 0)) - - val evalDataSet = Seq( - (EmptyParams(), sc.parallelize(qpaSeq0)), - (EmptyParams(), sc.parallelize(qpaSeq1))) - - val m = new MetricDevSuite.QStdevMetric() - val result = m.calculate(sc, evalDataSet) - - result shouldBe 2.0 - } - - test("Option Stdev Metric") { - val qpaSeq0 = Seq((1, 0, 0), (1, 0, 0), (1, 0, 0), (1, 0, 0)) - val qpaSeq1 = Seq((5, 0, 0), (5, 0, 0), (5, 0, 0), (5, 0, 0), (-5, 0, 0)) - - val evalDataSet = Seq( - (EmptyParams(), sc.parallelize(qpaSeq0)), - (EmptyParams(), sc.parallelize(qpaSeq1))) - - val m = new MetricDevSuite.QOptionStdevMetric() - val result = m.calculate(sc, evalDataSet) - - result shouldBe 2.0 - } - - test("Sum Metric [Int]") { - val qpaSeq0 = Seq((1, 0, 0), (2, 0, 0), (3, 0, 0)) - val qpaSeq1 = Seq((4, 0, 0), (5, 0, 0), (6, 0, 0)) - - val evalDataSet = Seq( - (EmptyParams(), sc.parallelize(qpaSeq0)), - (EmptyParams(), sc.parallelize(qpaSeq1))) - - val m = new MetricDevSuite.QIntSumMetric() - val result = m.calculate(sc, evalDataSet) - - result shouldBe 21 - } - - test("Sum Metric [Double]") { - val qpaSeq0 = Seq((1, 0, 0), (2, 0, 0), (3, 0, 0)) - val qpaSeq1 = Seq((4, 0, 0), (5, 0, 0), (6, 0, 0)) - - val evalDataSet = Seq( - (EmptyParams(), sc.parallelize(qpaSeq0)), - (EmptyParams(), sc.parallelize(qpaSeq1))) - - val m = new MetricDevSuite.QDoubleSumMetric() - val result = m.calculate(sc, evalDataSet) - - result shouldBe 21.0 - } -}
