http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/CreateWorkflow.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/workflow/CreateWorkflow.scala b/core/src/main/scala/io/prediction/workflow/CreateWorkflow.scala deleted file mode 100644 index af5aa14..0000000 --- a/core/src/main/scala/io/prediction/workflow/CreateWorkflow.scala +++ /dev/null @@ -1,274 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.workflow - -import java.net.URI - -import com.github.nscala_time.time.Imports._ -import com.google.common.io.ByteStreams -import grizzled.slf4j.Logging -import io.prediction.controller.Engine -import io.prediction.core.BaseEngine -import io.prediction.data.storage.EngineInstance -import io.prediction.data.storage.EvaluationInstance -import io.prediction.data.storage.Storage -import io.prediction.workflow.JsonExtractorOption.JsonExtractorOption -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.fs.Path -import org.json4s.JValue -import org.json4s.JString -import org.json4s.native.JsonMethods.parse - -import scala.language.existentials - -object CreateWorkflow extends Logging { - - case class WorkflowConfig( - deployMode: String = "", - batch: String = "", - engineId: String = "", - engineVersion: String = "", - engineVariant: String = "", - engineFactory: String = "", - engineParamsKey: String = "", - evaluationClass: Option[String] = None, - engineParamsGeneratorClass: Option[String] = None, - env: Option[String] = None, - skipSanityCheck: Boolean = false, - stopAfterRead: Boolean = false, - stopAfterPrepare: Boolean = false, - verbosity: Int = 0, - verbose: Boolean = false, - debug: Boolean = false, - logFile: Option[String] = None, - jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both) - - case class AlgorithmParams(name: String, params: JValue) - - private def stringFromFile(filePath: String): String = { - try { - val uri = new URI(filePath) - val fs = FileSystem.get(uri, new Configuration()) - new String(ByteStreams.toByteArray(fs.open(new Path(uri))).map(_.toChar)) - } catch { - case e: java.io.IOException => - error(s"Error reading from file: ${e.getMessage}. Aborting workflow.") - sys.exit(1) - } - } - - val parser = new scopt.OptionParser[WorkflowConfig]("CreateWorkflow") { - override def errorOnUnknownArgument: Boolean = false - opt[String]("batch") action { (x, c) => - c.copy(batch = x) - } text("Batch label of the workflow run.") - opt[String]("engine-id") required() action { (x, c) => - c.copy(engineId = x) - } text("Engine's ID.") - opt[String]("engine-version") required() action { (x, c) => - c.copy(engineVersion = x) - } text("Engine's version.") - opt[String]("engine-variant") required() action { (x, c) => - c.copy(engineVariant = x) - } text("Engine variant JSON.") - opt[String]("evaluation-class") action { (x, c) => - c.copy(evaluationClass = Some(x)) - } text("Class name of the run's evaluator.") - opt[String]("engine-params-generator-class") action { (x, c) => - c.copy(engineParamsGeneratorClass = Some(x)) - } text("Path to evaluator parameters") - opt[String]("env") action { (x, c) => - c.copy(env = Some(x)) - } text("Comma-separated list of environmental variables (in 'FOO=BAR' " + - "format) to pass to the Spark execution environment.") - opt[Unit]("verbose") action { (x, c) => - c.copy(verbose = true) - } text("Enable verbose output.") - opt[Unit]("debug") action { (x, c) => - c.copy(debug = true) - } text("Enable debug output.") - opt[Unit]("skip-sanity-check") action { (x, c) => - c.copy(skipSanityCheck = true) - } - opt[Unit]("stop-after-read") action { (x, c) => - c.copy(stopAfterRead = true) - } - opt[Unit]("stop-after-prepare") action { (x, c) => - c.copy(stopAfterPrepare = true) - } - opt[String]("deploy-mode") action { (x, c) => - c.copy(deployMode = x) - } - opt[Int]("verbosity") action { (x, c) => - c.copy(verbosity = x) - } - opt[String]("engine-factory") action { (x, c) => - c.copy(engineFactory = x) - } - opt[String]("engine-params-key") action { (x, c) => - c.copy(engineParamsKey = x) - } - opt[String]("log-file") action { (x, c) => - c.copy(logFile = Some(x)) - } - opt[String]("json-extractor") action { (x, c) => - c.copy(jsonExtractor = JsonExtractorOption.withName(x)) - } - } - - def main(args: Array[String]): Unit = { - val wfcOpt = parser.parse(args, WorkflowConfig()) - if (wfcOpt.isEmpty) { - logger.error("WorkflowConfig is empty. Quitting") - return - } - - val wfc = wfcOpt.get - - WorkflowUtils.modifyLogging(wfc.verbose) - - val evaluation = wfc.evaluationClass.map { ec => - try { - WorkflowUtils.getEvaluation(ec, getClass.getClassLoader)._2 - } catch { - case e @ (_: ClassNotFoundException | _: NoSuchMethodException) => - error(s"Unable to obtain evaluation $ec. Aborting workflow.", e) - sys.exit(1) - } - } - - val engineParamsGenerator = wfc.engineParamsGeneratorClass.map { epg => - try { - WorkflowUtils.getEngineParamsGenerator(epg, getClass.getClassLoader)._2 - } catch { - case e @ (_: ClassNotFoundException | _: NoSuchMethodException) => - error(s"Unable to obtain engine parameters generator $epg. " + - "Aborting workflow.", e) - sys.exit(1) - } - } - - val pioEnvVars = wfc.env.map(e => - e.split(',').flatMap(p => - p.split('=') match { - case Array(k, v) => List(k -> v) - case _ => Nil - } - ).toMap - ).getOrElse(Map()) - - if (evaluation.isEmpty) { - val variantJson = parse(stringFromFile(wfc.engineVariant)) - val engineFactory = if (wfc.engineFactory == "") { - variantJson \ "engineFactory" match { - case JString(s) => s - case _ => - error("Unable to read engine factory class name from " + - s"${wfc.engineVariant}. Aborting.") - sys.exit(1) - } - } else wfc.engineFactory - val variantId = variantJson \ "id" match { - case JString(s) => s - case _ => - error("Unable to read engine variant ID from " + - s"${wfc.engineVariant}. Aborting.") - sys.exit(1) - } - val (engineLanguage, engineFactoryObj) = try { - WorkflowUtils.getEngine(engineFactory, getClass.getClassLoader) - } catch { - case e @ (_: ClassNotFoundException | _: NoSuchMethodException) => - error(s"Unable to obtain engine: ${e.getMessage}. Aborting workflow.") - sys.exit(1) - } - - val engine: BaseEngine[_, _, _, _] = engineFactoryObj() - - val customSparkConf = WorkflowUtils.extractSparkConf(variantJson) - val workflowParams = WorkflowParams( - verbose = wfc.verbosity, - skipSanityCheck = wfc.skipSanityCheck, - stopAfterRead = wfc.stopAfterRead, - stopAfterPrepare = wfc.stopAfterPrepare, - sparkEnv = WorkflowParams().sparkEnv ++ customSparkConf) - - // Evaluator Not Specified. Do training. - if (!engine.isInstanceOf[Engine[_,_,_,_,_,_]]) { - throw new NoSuchMethodException(s"Engine $engine is not trainable") - } - - val trainableEngine = engine.asInstanceOf[Engine[_, _, _, _, _, _]] - - val engineParams = if (wfc.engineParamsKey == "") { - trainableEngine.jValueToEngineParams(variantJson, wfc.jsonExtractor) - } else { - engineFactoryObj.engineParams(wfc.engineParamsKey) - } - - val engineInstance = EngineInstance( - id = "", - status = "INIT", - startTime = DateTime.now, - endTime = DateTime.now, - engineId = wfc.engineId, - engineVersion = wfc.engineVersion, - engineVariant = variantId, - engineFactory = engineFactory, - batch = wfc.batch, - env = pioEnvVars, - sparkConf = workflowParams.sparkEnv, - dataSourceParams = - JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.dataSourceParams), - preparatorParams = - JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.preparatorParams), - algorithmsParams = - JsonExtractor.paramsToJson(wfc.jsonExtractor, engineParams.algorithmParamsList), - servingParams = - JsonExtractor.paramToJson(wfc.jsonExtractor, engineParams.servingParams)) - - val engineInstanceId = Storage.getMetaDataEngineInstances.insert( - engineInstance) - - CoreWorkflow.runTrain( - env = pioEnvVars, - params = workflowParams, - engine = trainableEngine, - engineParams = engineParams, - engineInstance = engineInstance.copy(id = engineInstanceId)) - } else { - val workflowParams = WorkflowParams( - verbose = wfc.verbosity, - skipSanityCheck = wfc.skipSanityCheck, - stopAfterRead = wfc.stopAfterRead, - stopAfterPrepare = wfc.stopAfterPrepare, - sparkEnv = WorkflowParams().sparkEnv) - val evaluationInstance = EvaluationInstance( - evaluationClass = wfc.evaluationClass.get, - engineParamsGeneratorClass = wfc.engineParamsGeneratorClass.get, - batch = wfc.batch, - env = pioEnvVars, - sparkConf = workflowParams.sparkEnv - ) - Workflow.runEvaluation( - evaluation = evaluation.get, - engineParamsGenerator = engineParamsGenerator.get, - evaluationInstance = evaluationInstance, - params = workflowParams) - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/EngineServerPlugin.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/workflow/EngineServerPlugin.scala b/core/src/main/scala/io/prediction/workflow/EngineServerPlugin.scala deleted file mode 100644 index 5b2649c..0000000 --- a/core/src/main/scala/io/prediction/workflow/EngineServerPlugin.scala +++ /dev/null @@ -1,40 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.workflow - -import io.prediction.data.storage.EngineInstance -import org.json4s._ - -trait EngineServerPlugin { - val pluginName: String - val pluginDescription: String - val pluginType: String - - def start(context: EngineServerPluginContext): Unit - - def process( - engineInstance: EngineInstance, - query: JValue, - prediction: JValue, - context: EngineServerPluginContext): JValue - - def handleREST(arguments: Seq[String]): String -} - -object EngineServerPlugin { - val outputBlocker = "outputblocker" - val outputSniffer = "outputsniffer" -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/EngineServerPluginContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/workflow/EngineServerPluginContext.scala b/core/src/main/scala/io/prediction/workflow/EngineServerPluginContext.scala deleted file mode 100644 index eb04c6f..0000000 --- a/core/src/main/scala/io/prediction/workflow/EngineServerPluginContext.scala +++ /dev/null @@ -1,88 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.workflow - -import java.net.URI -import java.util.ServiceLoader - -import akka.event.LoggingAdapter -import com.google.common.io.ByteStreams -import grizzled.slf4j.Logging -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.fs.Path -import org.json4s.DefaultFormats -import org.json4s.Formats -import org.json4s.JObject -import org.json4s.JValue -import org.json4s.native.JsonMethods._ - -import scala.collection.JavaConversions._ -import scala.collection.mutable - -class EngineServerPluginContext( - val plugins: mutable.Map[String, mutable.Map[String, EngineServerPlugin]], - val pluginParams: mutable.Map[String, JValue], - val log: LoggingAdapter) { - def outputBlockers: Map[String, EngineServerPlugin] = - plugins.getOrElse(EngineServerPlugin.outputBlocker, Map()).toMap - def outputSniffers: Map[String, EngineServerPlugin] = - plugins.getOrElse(EngineServerPlugin.outputSniffer, Map()).toMap -} - -object EngineServerPluginContext extends Logging { - implicit val formats: Formats = DefaultFormats - - def apply(log: LoggingAdapter, engineVariant: String): EngineServerPluginContext = { - val plugins = mutable.Map[String, mutable.Map[String, EngineServerPlugin]]( - EngineServerPlugin.outputBlocker -> mutable.Map(), - EngineServerPlugin.outputSniffer -> mutable.Map()) - val pluginParams = mutable.Map[String, JValue]() - val serviceLoader = ServiceLoader.load(classOf[EngineServerPlugin]) - val variantJson = parse(stringFromFile(engineVariant)) - (variantJson \ "plugins").extractOpt[JObject].foreach { pluginDefs => - pluginDefs.obj.foreach { pluginParams += _ } - } - serviceLoader foreach { service => - pluginParams.get(service.pluginName) map { params => - if ((params \ "enabled").extractOrElse(false)) { - info(s"Plugin ${service.pluginName} is enabled.") - plugins(service.pluginType) += service.pluginName -> service - } else { - info(s"Plugin ${service.pluginName} is disabled.") - } - } getOrElse { - info(s"Plugin ${service.pluginName} is disabled.") - } - } - new EngineServerPluginContext( - plugins, - pluginParams, - log) - } - - private def stringFromFile(filePath: String): String = { - try { - val uri = new URI(filePath) - val fs = FileSystem.get(uri, new Configuration()) - new String(ByteStreams.toByteArray(fs.open(new Path(uri))).map(_.toChar)) - } catch { - case e: java.io.IOException => - error(s"Error reading from file: ${e.getMessage}. Aborting.") - sys.exit(1) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/EngineServerPluginsActor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/workflow/EngineServerPluginsActor.scala b/core/src/main/scala/io/prediction/workflow/EngineServerPluginsActor.scala deleted file mode 100644 index a346d8e..0000000 --- a/core/src/main/scala/io/prediction/workflow/EngineServerPluginsActor.scala +++ /dev/null @@ -1,46 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.workflow - -import akka.actor.Actor -import akka.event.Logging -import io.prediction.data.storage.EngineInstance -import org.json4s.JValue - -class PluginsActor(engineVariant: String) extends Actor { - implicit val system = context.system - val log = Logging(system, this) - - val pluginContext = EngineServerPluginContext(log, engineVariant) - - def receive: PartialFunction[Any, Unit] = { - case (ei: EngineInstance, q: JValue, p: JValue) => - pluginContext.outputSniffers.values.foreach(_.process(ei, q, p, pluginContext)) - case h: PluginsActor.HandleREST => - try { - sender() ! pluginContext.outputSniffers(h.pluginName).handleREST(h.pluginArgs) - } catch { - case e: Exception => - sender() ! s"""{"message":"${e.getMessage}"}""" - } - case _ => - log.error("Unknown message sent to the Engine Server output sniffer plugin host.") - } -} - -object PluginsActor { - case class HandleREST(pluginName: String, pluginArgs: Seq[String]) -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/EvaluationWorkflow.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/workflow/EvaluationWorkflow.scala b/core/src/main/scala/io/prediction/workflow/EvaluationWorkflow.scala deleted file mode 100644 index ed70d87..0000000 --- a/core/src/main/scala/io/prediction/workflow/EvaluationWorkflow.scala +++ /dev/null @@ -1,42 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.workflow - -import io.prediction.controller.EngineParams -import io.prediction.controller.Evaluation -import io.prediction.core.BaseEvaluator -import io.prediction.core.BaseEvaluatorResult -import io.prediction.core.BaseEngine - -import grizzled.slf4j.Logger -import org.apache.spark.SparkContext - -import scala.language.existentials - -object EvaluationWorkflow { - @transient lazy val logger = Logger[this.type] - def runEvaluation[EI, Q, P, A, R <: BaseEvaluatorResult]( - sc: SparkContext, - evaluation: Evaluation, - engine: BaseEngine[EI, Q, P, A], - engineParamsList: Seq[EngineParams], - evaluator: BaseEvaluator[EI, Q, P, A, R], - params: WorkflowParams) - : R = { - val engineEvalDataSet = engine.batchEval(sc, engineParamsList, params) - evaluator.evaluateBase(sc, evaluation, engineEvalDataSet, params) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/FakeWorkflow.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/workflow/FakeWorkflow.scala b/core/src/main/scala/io/prediction/workflow/FakeWorkflow.scala deleted file mode 100644 index 350a430..0000000 --- a/core/src/main/scala/io/prediction/workflow/FakeWorkflow.scala +++ /dev/null @@ -1,106 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.workflow - -import io.prediction.annotation.Experimental -// FIXME(yipjustin): Remove wildcard import. -import io.prediction.core._ -import io.prediction.controller._ - -import grizzled.slf4j.Logger -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - - -@Experimental -private[prediction] class FakeEngine -extends BaseEngine[EmptyParams, EmptyParams, EmptyParams, EmptyParams] { - @transient lazy val logger = Logger[this.type] - - def train( - sc: SparkContext, - engineParams: EngineParams, - engineInstanceId: String, - params: WorkflowParams): Seq[Any] = { - throw new StopAfterReadInterruption() - } - - def eval( - sc: SparkContext, - engineParams: EngineParams, - params: WorkflowParams) - : Seq[(EmptyParams, RDD[(EmptyParams, EmptyParams, EmptyParams)])] = { - return Seq[(EmptyParams, RDD[(EmptyParams, EmptyParams, EmptyParams)])]() - } -} - -@Experimental -private[prediction] class FakeRunner(f: (SparkContext => Unit)) - extends BaseEvaluator[EmptyParams, EmptyParams, EmptyParams, EmptyParams, - FakeEvalResult] { - @transient private lazy val logger = Logger[this.type] - def evaluateBase( - sc: SparkContext, - evaluation: Evaluation, - engineEvalDataSet: - Seq[(EngineParams, Seq[(EmptyParams, RDD[(EmptyParams, EmptyParams, EmptyParams)])])], - params: WorkflowParams): FakeEvalResult = { - f(sc) - FakeEvalResult() - } -} - -@Experimental -private[prediction] case class FakeEvalResult() extends BaseEvaluatorResult { - override val noSave: Boolean = true -} - -/** FakeRun allows user to implement custom function under the exact enviroment - * as other PredictionIO workflow. - * - * Useful for developing new features. Only need to extend this trait and - * implement a function: (SparkContext => Unit). For example, the code below - * can be run with `pio eval HelloWorld`. - * - * {{{ - * object HelloWorld extends FakeRun { - * // func defines the function pio runs, must have signature (SparkContext => Unit). - * func = f - * - * def f(sc: SparkContext): Unit { - * val logger = Logger[this.type] - * logger.info("HelloWorld") - * } - * } - * }}} - * - */ -@Experimental -trait FakeRun extends Evaluation with EngineParamsGenerator { - private[this] var _runner: FakeRunner = _ - - def runner: FakeRunner = _runner - def runner_=(r: FakeRunner) { - engineEvaluator = (new FakeEngine(), r) - engineParamsList = Seq(new EngineParams()) - } - - def func: (SparkContext => Unit) = { (sc: SparkContext) => Unit } - def func_=(f: SparkContext => Unit) { - runner = new FakeRunner(f) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/JsonExtractor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/workflow/JsonExtractor.scala b/core/src/main/scala/io/prediction/workflow/JsonExtractor.scala deleted file mode 100644 index 7034063..0000000 --- a/core/src/main/scala/io/prediction/workflow/JsonExtractor.scala +++ /dev/null @@ -1,164 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.workflow - -import com.google.gson.Gson -import com.google.gson.GsonBuilder -import com.google.gson.TypeAdapterFactory -import io.prediction.controller.EngineParams -import io.prediction.controller.Params -import io.prediction.controller.Utils -import io.prediction.workflow.JsonExtractorOption.JsonExtractorOption -import org.json4s.Extraction -import org.json4s.Formats -import org.json4s.JsonAST.{JArray, JValue} -import org.json4s.native.JsonMethods.compact -import org.json4s.native.JsonMethods.pretty -import org.json4s.native.JsonMethods.parse -import org.json4s.native.JsonMethods.render -import org.json4s.reflect.TypeInfo - -object JsonExtractor { - - def toJValue( - extractorOption: JsonExtractorOption, - o: Any, - json4sFormats: Formats = Utils.json4sDefaultFormats, - gsonTypeAdapterFactories: Seq[TypeAdapterFactory] = Seq.empty[TypeAdapterFactory]): JValue = { - - extractorOption match { - case JsonExtractorOption.Both => - - val json4sResult = Extraction.decompose(o)(json4sFormats) - json4sResult.children.size match { - case 0 => parse(gson(gsonTypeAdapterFactories).toJson(o)) - case _ => json4sResult - } - case JsonExtractorOption.Json4sNative => - Extraction.decompose(o)(json4sFormats) - case JsonExtractorOption.Gson => - parse(gson(gsonTypeAdapterFactories).toJson(o)) - } - } - - def extract[T]( - extractorOption: JsonExtractorOption, - json: String, - clazz: Class[T], - json4sFormats: Formats = Utils.json4sDefaultFormats, - gsonTypeAdapterFactories: Seq[TypeAdapterFactory] = Seq.empty[TypeAdapterFactory]): T = { - - extractorOption match { - case JsonExtractorOption.Both => - try { - extractWithJson4sNative(json, json4sFormats, clazz) - } catch { - case e: Exception => - extractWithGson(json, clazz, gsonTypeAdapterFactories) - } - case JsonExtractorOption.Json4sNative => - extractWithJson4sNative(json, json4sFormats, clazz) - case JsonExtractorOption.Gson => - extractWithGson(json, clazz, gsonTypeAdapterFactories) - } - } - - def paramToJson(extractorOption: JsonExtractorOption, param: (String, Params)): String = { - // to be replaced JValue needs to be done by Json4s, otherwise the tuple JValue will be wrong - val toBeReplacedJValue = - JsonExtractor.toJValue(JsonExtractorOption.Json4sNative, (param._1, null)) - val paramJValue = JsonExtractor.toJValue(extractorOption, param._2) - - compact(render(toBeReplacedJValue.replace(param._1 :: Nil, paramJValue))) - } - - def paramsToJson(extractorOption: JsonExtractorOption, params: Seq[(String, Params)]): String = { - compact(render(paramsToJValue(extractorOption, params))) - } - - def engineParamsToJson(extractorOption: JsonExtractorOption, params: EngineParams) : String = { - compact(render(engineParamsToJValue(extractorOption, params))) - } - - def engineParamstoPrettyJson( - extractorOption: JsonExtractorOption, - params: EngineParams) : String = { - - pretty(render(engineParamsToJValue(extractorOption, params))) - } - - private def engineParamsToJValue(extractorOption: JsonExtractorOption, params: EngineParams) = { - var jValue = toJValue(JsonExtractorOption.Json4sNative, params) - - val dataSourceParamsJValue = toJValue(extractorOption, params.dataSourceParams._2) - jValue = jValue.replace( - "dataSourceParams" :: params.dataSourceParams._1 :: Nil, - dataSourceParamsJValue) - - val preparatorParamsJValue = toJValue(extractorOption, params.preparatorParams._2) - jValue = jValue.replace( - "preparatorParams" :: params.preparatorParams._1 :: Nil, - preparatorParamsJValue) - - val algorithmParamsJValue = paramsToJValue(extractorOption, params.algorithmParamsList) - jValue = jValue.replace("algorithmParamsList" :: Nil, algorithmParamsJValue) - - val servingParamsJValue = toJValue(extractorOption, params.servingParams._2) - jValue = jValue.replace("servingParams" :: params.servingParams._1 :: Nil, servingParamsJValue) - - jValue - } - - private - def paramsToJValue(extractorOption: JsonExtractorOption, params: Seq[(String, Params)]) = { - val jValues = params.map { case (name, param) => - // to be replaced JValue needs to be done by Json4s, otherwise the tuple JValue will be wrong - val toBeReplacedJValue = - JsonExtractor.toJValue(JsonExtractorOption.Json4sNative, (name, null)) - val paramJValue = JsonExtractor.toJValue(extractorOption, param) - - toBeReplacedJValue.replace(name :: Nil, paramJValue) - } - - JArray(jValues.toList) - } - - private def extractWithJson4sNative[T]( - json: String, - formats: Formats, - clazz: Class[T]): T = { - - Extraction.extract(parse(json), TypeInfo(clazz, None))(formats).asInstanceOf[T] - } - - private def extractWithGson[T]( - json: String, - clazz: Class[T], - gsonTypeAdapterFactories: Seq[TypeAdapterFactory]): T = { - - gson(gsonTypeAdapterFactories).fromJson(json, clazz) - } - - private def gson(gsonTypeAdapterFactories: Seq[TypeAdapterFactory]): Gson = { - val gsonBuilder = new GsonBuilder() - gsonTypeAdapterFactories.foreach { typeAdapterFactory => - gsonBuilder.registerTypeAdapterFactory(typeAdapterFactory) - } - - gsonBuilder.create() - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/JsonExtractorOption.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/workflow/JsonExtractorOption.scala b/core/src/main/scala/io/prediction/workflow/JsonExtractorOption.scala deleted file mode 100644 index 60272fb..0000000 --- a/core/src/main/scala/io/prediction/workflow/JsonExtractorOption.scala +++ /dev/null @@ -1,23 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.workflow - -object JsonExtractorOption extends Enumeration { - type JsonExtractorOption = Value - val Json4sNative = Value - val Gson = Value - val Both = Value -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/PersistentModelManifest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/workflow/PersistentModelManifest.scala b/core/src/main/scala/io/prediction/workflow/PersistentModelManifest.scala deleted file mode 100644 index c1c0a6d..0000000 --- a/core/src/main/scala/io/prediction/workflow/PersistentModelManifest.scala +++ /dev/null @@ -1,18 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.workflow - -case class PersistentModelManifest(className: String) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/Workflow.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/workflow/Workflow.scala b/core/src/main/scala/io/prediction/workflow/Workflow.scala deleted file mode 100644 index c0543ab..0000000 --- a/core/src/main/scala/io/prediction/workflow/Workflow.scala +++ /dev/null @@ -1,135 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.workflow - -import io.prediction.annotation.Experimental -import io.prediction.controller.EngineParams -import io.prediction.controller.EngineParamsGenerator -import io.prediction.controller.Evaluation -import io.prediction.core.BaseEngine -import io.prediction.core.BaseEvaluator -import io.prediction.core.BaseEvaluatorResult -import io.prediction.data.storage.EvaluationInstance - -/** Collection of workflow creation methods. - * @group Workflow - */ -object Workflow { - // evaluator is already instantiated. - // This is an undocumented way of using evaluator. Still experimental. - // evaluatorParams is used to write into EngineInstance, will be shown in - // dashboard. - /* - def runEval[EI, Q, P, A, ER <: AnyRef]( - engine: BaseEngine[EI, Q, P, A], - engineParams: EngineParams, - evaluator: BaseEvaluator[EI, Q, P, A, ER], - evaluatorParams: Params, - env: Map[String, String] = WorkflowUtils.pioEnvVars, - params: WorkflowParams = WorkflowParams()) { - - implicit lazy val formats = Utils.json4sDefaultFormats + - new NameParamsSerializer - - val engineInstance = EngineInstance( - id = "", - status = "INIT", - startTime = DateTime.now, - endTime = DateTime.now, - engineId = "", - engineVersion = "", - engineVariant = "", - engineFactory = "FIXME", - evaluatorClass = evaluator.getClass.getName(), - batch = params.batch, - env = env, - sparkConf = params.sparkEnv, - dataSourceParams = write(engineParams.dataSourceParams), - preparatorParams = write(engineParams.preparatorParams), - algorithmsParams = write(engineParams.algorithmParamsList), - servingParams = write(engineParams.servingParams), - evaluatorParams = write(evaluatorParams), - evaluatorResults = "", - evaluatorResultsHTML = "", - evaluatorResultsJSON = "") - - CoreWorkflow.runEval( - engine = engine, - engineParams = engineParams, - engineInstance = engineInstance, - evaluator = evaluator, - evaluatorParams = evaluatorParams, - env = env, - params = params) - } - */ - - def runEvaluation( - evaluation: Evaluation, - engineParamsGenerator: EngineParamsGenerator, - env: Map[String, String] = WorkflowUtils.pioEnvVars, - evaluationInstance: EvaluationInstance = EvaluationInstance(), - params: WorkflowParams = WorkflowParams()) { - runEvaluationTypeless( - evaluation = evaluation, - engine = evaluation.engine, - engineParamsList = engineParamsGenerator.engineParamsList, - evaluationInstance = evaluationInstance, - evaluator = evaluation.evaluator, - env = env, - params = params - ) - } - - def runEvaluationTypeless[ - EI, Q, P, A, EEI, EQ, EP, EA, ER <: BaseEvaluatorResult]( - evaluation: Evaluation, - engine: BaseEngine[EI, Q, P, A], - engineParamsList: Seq[EngineParams], - evaluationInstance: EvaluationInstance, - evaluator: BaseEvaluator[EEI, EQ, EP, EA, ER], - env: Map[String, String] = WorkflowUtils.pioEnvVars, - params: WorkflowParams = WorkflowParams()) { - runEvaluationViaCoreWorkflow( - evaluation = evaluation, - engine = engine, - engineParamsList = engineParamsList, - evaluationInstance = evaluationInstance, - evaluator = evaluator.asInstanceOf[BaseEvaluator[EI, Q, P, A, ER]], - env = env, - params = params) - } - - /** :: Experimental :: */ - @Experimental - def runEvaluationViaCoreWorkflow[EI, Q, P, A, R <: BaseEvaluatorResult]( - evaluation: Evaluation, - engine: BaseEngine[EI, Q, P, A], - engineParamsList: Seq[EngineParams], - evaluationInstance: EvaluationInstance, - evaluator: BaseEvaluator[EI, Q, P, A, R], - env: Map[String, String] = WorkflowUtils.pioEnvVars, - params: WorkflowParams = WorkflowParams()) { - CoreWorkflow.runEvaluation( - evaluation = evaluation, - engine = engine, - engineParamsList = engineParamsList, - evaluationInstance = evaluationInstance, - evaluator = evaluator, - env = env, - params = params) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/WorkflowContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/workflow/WorkflowContext.scala b/core/src/main/scala/io/prediction/workflow/WorkflowContext.scala deleted file mode 100644 index 264c757..0000000 --- a/core/src/main/scala/io/prediction/workflow/WorkflowContext.scala +++ /dev/null @@ -1,45 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.workflow - -import grizzled.slf4j.Logging -import org.apache.spark.SparkContext -import org.apache.spark.SparkConf - -import scala.language.existentials - -// FIXME: move to better location. -object WorkflowContext extends Logging { - def apply( - batch: String = "", - executorEnv: Map[String, String] = Map(), - sparkEnv: Map[String, String] = Map(), - mode: String = "" - ): SparkContext = { - val conf = new SparkConf() - val prefix = if (mode == "") "PredictionIO" else s"PredictionIO ${mode}" - conf.setAppName(s"${prefix}: ${batch}") - debug(s"Executor environment received: ${executorEnv}") - executorEnv.map(kv => conf.setExecutorEnv(kv._1, kv._2)) - debug(s"SparkConf executor environment: ${conf.getExecutorEnv}") - debug(s"Application environment received: ${sparkEnv}") - conf.setAll(sparkEnv) - val sparkConfString = conf.getAll.toSeq - debug(s"SparkConf environment: $sparkConfString") - new SparkContext(conf) - } -} - http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/WorkflowParams.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/workflow/WorkflowParams.scala b/core/src/main/scala/io/prediction/workflow/WorkflowParams.scala deleted file mode 100644 index 88ec54e..0000000 --- a/core/src/main/scala/io/prediction/workflow/WorkflowParams.scala +++ /dev/null @@ -1,42 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.workflow - -/** Workflow parameters. - * - * @param batch Batch label of the run. - * @param verbose Verbosity level. - * @param saveModel Controls whether trained models are persisted. - * @param sparkEnv Spark properties that will be set in SparkConf.setAll(). - * @param skipSanityCheck Skips all data sanity check. - * @param stopAfterRead Stops workflow after reading from data source. - * @param stopAfterPrepare Stops workflow after data preparation. - * @group Workflow - */ -case class WorkflowParams( - batch: String = "", - verbose: Int = 2, - saveModel: Boolean = true, - sparkEnv: Map[String, String] = - Map[String, String]("spark.executor.extraClassPath" -> "."), - skipSanityCheck: Boolean = false, - stopAfterRead: Boolean = false, - stopAfterPrepare: Boolean = false) { - // Temporary workaround for WorkflowParamsBuilder for Java. It doesn't support - // custom spark environment yet. - def this(batch: String, verbose: Int, saveModel: Boolean) - = this(batch, verbose, saveModel, Map[String, String]()) -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/WorkflowUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/workflow/WorkflowUtils.scala b/core/src/main/scala/io/prediction/workflow/WorkflowUtils.scala deleted file mode 100644 index d93b9eb..0000000 --- a/core/src/main/scala/io/prediction/workflow/WorkflowUtils.scala +++ /dev/null @@ -1,419 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.workflow - -import java.io.File -import java.io.FileNotFoundException - -import io.prediction.controller.EmptyParams -import io.prediction.controller.EngineFactory -import io.prediction.controller.EngineParamsGenerator -import io.prediction.controller.Evaluation -import io.prediction.controller.Params -import io.prediction.controller.PersistentModelLoader -import io.prediction.controller.Utils -import io.prediction.core.BuildInfo - -import com.google.gson.Gson -import com.google.gson.JsonSyntaxException -import grizzled.slf4j.Logging -import io.prediction.workflow.JsonExtractorOption.JsonExtractorOption -import org.apache.log4j.Level -import org.apache.log4j.LogManager -import org.apache.spark.SparkContext -import org.apache.spark.api.java.JavaRDDLike -import org.apache.spark.rdd.RDD -import org.json4s.JsonAST.JValue -import org.json4s.MappingException -import org.json4s._ -import org.json4s.native.JsonMethods._ - -import scala.io.Source -import scala.language.existentials -import scala.reflect.runtime.universe - -/** Collection of reusable workflow related utilities. */ -object WorkflowUtils extends Logging { - @transient private lazy val gson = new Gson - - /** Obtains an Engine object in Scala, or instantiate an Engine in Java. - * - * @param engine Engine factory name. - * @param cl A Java ClassLoader to look for engine-related classes. - * - * @throws ClassNotFoundException - * Thrown when engine factory class does not exist. - * @throws NoSuchMethodException - * Thrown when engine factory's apply() method is not implemented. - */ - def getEngine(engine: String, cl: ClassLoader): (EngineLanguage.Value, EngineFactory) = { - val runtimeMirror = universe.runtimeMirror(cl) - val engineModule = runtimeMirror.staticModule(engine) - val engineObject = runtimeMirror.reflectModule(engineModule) - try { - ( - EngineLanguage.Scala, - engineObject.instance.asInstanceOf[EngineFactory] - ) - } catch { - case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try { - ( - EngineLanguage.Java, - Class.forName(engine).newInstance.asInstanceOf[EngineFactory] - ) - } - } - } - - def getEngineParamsGenerator(epg: String, cl: ClassLoader): - (EngineLanguage.Value, EngineParamsGenerator) = { - val runtimeMirror = universe.runtimeMirror(cl) - val epgModule = runtimeMirror.staticModule(epg) - val epgObject = runtimeMirror.reflectModule(epgModule) - try { - ( - EngineLanguage.Scala, - epgObject.instance.asInstanceOf[EngineParamsGenerator] - ) - } catch { - case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try { - ( - EngineLanguage.Java, - Class.forName(epg).newInstance.asInstanceOf[EngineParamsGenerator] - ) - } - } - } - - def getEvaluation(evaluation: String, cl: ClassLoader): (EngineLanguage.Value, Evaluation) = { - val runtimeMirror = universe.runtimeMirror(cl) - val evaluationModule = runtimeMirror.staticModule(evaluation) - val evaluationObject = runtimeMirror.reflectModule(evaluationModule) - try { - ( - EngineLanguage.Scala, - evaluationObject.instance.asInstanceOf[Evaluation] - ) - } catch { - case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try { - ( - EngineLanguage.Java, - Class.forName(evaluation).newInstance.asInstanceOf[Evaluation] - ) - } - } - } - - /** Converts a JSON document to an instance of Params. - * - * @param language Engine's programming language. - * @param json JSON document. - * @param clazz Class of the component that is going to receive the resulting - * Params instance as a constructor argument. - * @param jsonExtractor JSON extractor option. - * @param formats JSON4S serializers for deserialization. - * - * @throws MappingException Thrown when JSON4S fails to perform conversion. - * @throws JsonSyntaxException Thrown when GSON fails to perform conversion. - */ - def extractParams( - language: EngineLanguage.Value = EngineLanguage.Scala, - json: String, - clazz: Class[_], - jsonExtractor: JsonExtractorOption, - formats: Formats = Utils.json4sDefaultFormats): Params = { - implicit val f = formats - val pClass = clazz.getConstructors.head.getParameterTypes - if (pClass.size == 0) { - if (json != "") { - warn(s"Non-empty parameters supplied to ${clazz.getName}, but its " + - "constructor does not accept any arguments. Stubbing with empty " + - "parameters.") - } - EmptyParams() - } else { - val apClass = pClass.head - try { - JsonExtractor.extract(jsonExtractor, json, apClass, f).asInstanceOf[Params] - } catch { - case e@(_: MappingException | _: JsonSyntaxException) => - error( - s"Unable to extract parameters for ${apClass.getName} from " + - s"JSON string: $json. Aborting workflow.", - e) - throw e - } - } - } - - def getParamsFromJsonByFieldAndClass( - variantJson: JValue, - field: String, - classMap: Map[String, Class[_]], - engineLanguage: EngineLanguage.Value, - jsonExtractor: JsonExtractorOption): (String, Params) = { - variantJson findField { - case JField(f, _) => f == field - case _ => false - } map { jv => - implicit lazy val formats = Utils.json4sDefaultFormats + new NameParamsSerializer - val np: NameParams = try { - jv._2.extract[NameParams] - } catch { - case e: Exception => - error(s"Unable to extract $field name and params $jv") - throw e - } - val extractedParams = np.params.map { p => - try { - if (!classMap.contains(np.name)) { - error(s"Unable to find $field class with name '${np.name}'" + - " defined in Engine.") - sys.exit(1) - } - WorkflowUtils.extractParams( - engineLanguage, - compact(render(p)), - classMap(np.name), - jsonExtractor, - formats) - } catch { - case e: Exception => - error(s"Unable to extract $field params $p") - throw e - } - }.getOrElse(EmptyParams()) - - (np.name, extractedParams) - } getOrElse("", EmptyParams()) - } - - /** Grab environmental variables that starts with 'PIO_'. */ - def pioEnvVars: Map[String, String] = - sys.env.filter(kv => kv._1.startsWith("PIO_")) - - /** Converts Java (non-Scala) objects to a JSON4S JValue. - * - * @param params The Java object to be converted. - */ - def javaObjectToJValue(params: AnyRef): JValue = parse(gson.toJson(params)) - - private[prediction] def checkUpgrade( - component: String = "core", - engine: String = ""): Unit = { - val runner = new Thread(new UpgradeCheckRunner(component, engine)) - runner.start() - } - - // Extract debug string by recursively traversing the data. - def debugString[D](data: D): String = { - val s: String = data match { - case rdd: RDD[_] => { - debugString(rdd.collect()) - } - case javaRdd: JavaRDDLike[_, _] => { - debugString(javaRdd.collect()) - } - case array: Array[_] => { - "[" + array.map(debugString).mkString(",") + "]" - } - case d: AnyRef => { - d.toString - } - case null => "null" - } - s - } - - /** Detect third party software configuration files to be submitted as - * extras to Apache Spark. This makes sure all executors receive the same - * configuration. - */ - def thirdPartyConfFiles: Seq[String] = { - val thirdPartyFiles = Map( - "PIO_CONF_DIR" -> "log4j.properties", - "ES_CONF_DIR" -> "elasticsearch.yml", - "HADOOP_CONF_DIR" -> "core-site.xml", - "HBASE_CONF_DIR" -> "hbase-site.xml") - - thirdPartyFiles.keys.toSeq.map { k: String => - sys.env.get(k) map { x => - val p = Seq(x, thirdPartyFiles(k)).mkString(File.separator) - if (new File(p).exists) Seq(p) else Seq[String]() - } getOrElse Seq[String]() - }.flatten - } - - def thirdPartyClasspaths: Seq[String] = { - val thirdPartyPaths = Seq( - "PIO_CONF_DIR", - "ES_CONF_DIR", - "POSTGRES_JDBC_DRIVER", - "MYSQL_JDBC_DRIVER", - "HADOOP_CONF_DIR", - "HBASE_CONF_DIR") - thirdPartyPaths.map(p => - sys.env.get(p).map(Seq(_)).getOrElse(Seq[String]()) - ).flatten - } - - def modifyLogging(verbose: Boolean): Unit = { - val rootLoggerLevel = if (verbose) Level.TRACE else Level.INFO - val chattyLoggerLevel = if (verbose) Level.INFO else Level.WARN - - LogManager.getRootLogger.setLevel(rootLoggerLevel) - - LogManager.getLogger("org.elasticsearch").setLevel(chattyLoggerLevel) - LogManager.getLogger("org.apache.hadoop").setLevel(chattyLoggerLevel) - LogManager.getLogger("org.apache.spark").setLevel(chattyLoggerLevel) - LogManager.getLogger("org.eclipse.jetty").setLevel(chattyLoggerLevel) - LogManager.getLogger("akka").setLevel(chattyLoggerLevel) - } - - def extractNameParams(jv: JValue): NameParams = { - implicit val formats = Utils.json4sDefaultFormats - val nameOpt = (jv \ "name").extract[Option[String]] - val paramsOpt = (jv \ "params").extract[Option[JValue]] - - if (nameOpt.isEmpty && paramsOpt.isEmpty) { - error("Unable to find 'name' or 'params' fields in" + - s" ${compact(render(jv))}.\n" + - "Since 0.8.4, the 'params' field is required in engine.json" + - " in order to specify parameters for DataSource, Preparator or" + - " Serving.\n" + - "Please go to https://docs.prediction.io/resources/upgrade/" + - " for detailed instruction of how to change engine.json.") - sys.exit(1) - } - - if (nameOpt.isEmpty) { - info(s"No 'name' is found. Default empty String will be used.") - } - - if (paramsOpt.isEmpty) { - info(s"No 'params' is found. Default EmptyParams will be used.") - } - - NameParams( - name = nameOpt.getOrElse(""), - params = paramsOpt - ) - } - - def extractSparkConf(root: JValue): List[(String, String)] = { - def flatten(jv: JValue): List[(List[String], String)] = { - jv match { - case JObject(fields) => - for ((namePrefix, childJV) <- fields; - (name, value) <- flatten(childJV)) - yield (namePrefix :: name) -> value - case JArray(_) => { - error("Arrays are not allowed in the sparkConf section of engine.js.") - sys.exit(1) - } - case JNothing => List() - case _ => List(List() -> jv.values.toString) - } - } - - flatten(root \ "sparkConf").map(x => - (x._1.reduce((a, b) => s"$a.$b"), x._2)) - } -} - -case class NameParams(name: String, params: Option[JValue]) - -class NameParamsSerializer extends CustomSerializer[NameParams](format => ( { - case jv: JValue => WorkflowUtils.extractNameParams(jv) -}, { - case x: NameParams => - JObject(JField("name", JString(x.name)) :: - JField("params", x.params.getOrElse(JNothing)) :: Nil) -} - )) - -/** Collection of reusable workflow related utilities that touch on Apache - * Spark. They are separated to avoid compilation problems with certain code. - */ -object SparkWorkflowUtils extends Logging { - def getPersistentModel[AP <: Params, M]( - pmm: PersistentModelManifest, - runId: String, - params: AP, - sc: Option[SparkContext], - cl: ClassLoader): M = { - val runtimeMirror = universe.runtimeMirror(cl) - val pmmModule = runtimeMirror.staticModule(pmm.className) - val pmmObject = runtimeMirror.reflectModule(pmmModule) - try { - pmmObject.instance.asInstanceOf[PersistentModelLoader[AP, M]]( - runId, - params, - sc) - } catch { - case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try { - val loadMethod = Class.forName(pmm.className).getMethod( - "load", - classOf[String], - classOf[Params], - classOf[SparkContext]) - loadMethod.invoke(null, runId, params, sc.orNull).asInstanceOf[M] - } catch { - case e: ClassNotFoundException => - error(s"Model class ${pmm.className} cannot be found.") - throw e - case e: NoSuchMethodException => - error( - "The load(String, Params, SparkContext) method cannot be found.") - throw e - } - } - } -} - -class UpgradeCheckRunner( - val component: String, - val engine: String) extends Runnable with Logging { - val version = BuildInfo.version - val versionsHost = "https://direct.prediction.io/" - - def run(): Unit = { - val url = if (engine == "") { - s"$versionsHost$version/$component.json" - } else { - s"$versionsHost$version/$component/$engine.json" - } - try { - val upgradeData = Source.fromURL(url) - } catch { - case e: FileNotFoundException => - debug(s"Update metainfo not found. $url") - case e: java.net.UnknownHostException => - debug(s"${e.getClass.getName}: {e.getMessage}") - } - // TODO: Implement upgrade logic - } -} - -class WorkflowInterruption() extends Exception - -case class StopAfterReadInterruption() extends WorkflowInterruption - -case class StopAfterPrepareInterruption() extends WorkflowInterruption - -object EngineLanguage extends Enumeration { - val Scala, Java = Value -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/CustomQuerySerializer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/CustomQuerySerializer.scala b/core/src/main/scala/org/apache/predictionio/controller/CustomQuerySerializer.scala new file mode 100644 index 0000000..2fa5551 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/CustomQuerySerializer.scala @@ -0,0 +1,37 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import org.apache.predictionio.core.BaseQuerySerializer + +/** If your query class cannot be automatically serialized/deserialized to/from + * JSON, implement a trait by extending this trait, and overriding the + * `querySerializer` member with your + * [[https://github.com/json4s/json4s#serializing-non-supported-types custom JSON4S serializer]]. + * Algorithm and serving classes using your query class would only need to mix + * in the trait to enable the custom serializer. + * + * @group Helper + */ +trait CustomQuerySerializer extends BaseQuerySerializer + +/** DEPRECATED. Use [[CustomQuerySerializer]] instead. + * + * @group Helper + */ +@deprecated("Use CustomQuerySerializer instead.", "0.9.2") +trait WithQuerySerializer extends CustomQuerySerializer + http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/org/apache/predictionio/controller/Deployment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/Deployment.scala b/core/src/main/scala/org/apache/predictionio/controller/Deployment.scala new file mode 100644 index 0000000..76fe0b3 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/controller/Deployment.scala @@ -0,0 +1,56 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.controller + +import org.apache.predictionio.core.BaseEngine + +import scala.language.implicitConversions + +/** Defines a deployment that contains an [[Engine]] + * + * @group Engine + */ +trait Deployment extends EngineFactory { + protected[this] var _engine: BaseEngine[_, _, _, _] = _ + protected[this] var engineSet: Boolean = false + + /** Returns the [[Engine]] of this [[Deployment]] */ + def apply(): BaseEngine[_, _, _, _] = { + assert(engineSet, "Engine not set") + _engine + } + + /** Returns the [[Engine]] contained in this [[Deployment]]. */ + private [prediction] + def engine: BaseEngine[_, _, _, _] = { + assert(engineSet, "Engine not set") + _engine + } + + /** Sets the [[Engine]] for this [[Deployment]] + * + * @param engine An implementation of [[Engine]] + * @tparam EI Evaluation information class + * @tparam Q Query class + * @tparam P Predicted result class + * @tparam A Actual result class + */ + def engine_=[EI, Q, P, A](engine: BaseEngine[EI, Q, P, A]) { + assert(!engineSet, "Engine can be set at most once") + _engine = engine + engineSet = true + } +}
