[PIO-105] Batch Predictions Implement a new pio batchpredict command to enable massive, fast, batch predictions from a trained model. Read a multi-object JSON file as the input format, with one query object per line. Similarly, write results to a multi-object JSON file, with one prediction result + its original query per line.
Closes #412 Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/cfa3f5da Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/cfa3f5da Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/cfa3f5da Branch: refs/heads/livedoc Commit: cfa3f5dab533a67688ec2f84182eccedc56fa84e Parents: 965c73f Author: Mars Hall <[email protected]> Authored: Tue Aug 1 14:56:47 2017 -0700 Committer: Donald Szeto <[email protected]> Committed: Tue Aug 1 14:56:47 2017 -0700 ---------------------------------------------------------------------- .../predictionio/workflow/BatchPredict.scala | 230 +++++++++++++++++++ .../predictionio/tools/RunBatchPredict.scala | 72 ++++++ .../predictionio/tools/commands/Engine.scala | 55 ++++- .../predictionio/tools/console/Console.scala | 58 ++++- .../apache/predictionio/tools/console/Pio.scala | 13 +- .../tools/console/batchpredict.scala.txt | 25 ++ .../predictionio/tools/console/main.scala.txt | 1 + 7 files changed, 450 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala new file mode 100644 index 0000000..2fb0545 --- /dev/null +++ b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.workflow + +import java.io.Serializable + +import com.twitter.bijection.Injection +import com.twitter.chill.{KryoBase, KryoInjection, ScalaKryoInstantiator} +import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer +import grizzled.slf4j.Logging +import org.apache.predictionio.controller.{Engine, Utils} +import org.apache.predictionio.core.{BaseAlgorithm, BaseServing, Doer} +import org.apache.predictionio.data.storage.{EngineInstance, Storage} +import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption +import org.apache.spark.rdd.RDD +import org.json4s._ +import org.json4s.native.JsonMethods._ +import scala.language.existentials + +case class BatchPredictConfig( + inputFilePath: String = "batchpredict-input.json", + outputFilePath: String = "batchpredict-output.json", + queryPartitions: Option[Int] = None, + engineInstanceId: String = "", + engineId: Option[String] = None, + engineVersion: Option[String] = None, + engineVariant: String = "", + env: Option[String] = None, + verbose: Boolean = false, + debug: Boolean = false, + jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both) + +object BatchPredict extends Logging { + + class KryoInstantiator(classLoader: ClassLoader) extends ScalaKryoInstantiator { + override def newKryo(): KryoBase = { + val kryo = super.newKryo() + kryo.setClassLoader(classLoader) + SynchronizedCollectionsSerializer.registerSerializers(kryo) + kryo + } + } + + object KryoInstantiator extends Serializable { + def newKryoInjection : Injection[Any, Array[Byte]] = { + val kryoInstantiator = new KryoInstantiator(getClass.getClassLoader) + KryoInjection.instance(kryoInstantiator) + } + } + + val engineInstances = Storage.getMetaDataEngineInstances + val modeldata = Storage.getModelDataModels + + def main(args: Array[String]): Unit = { + val parser = new scopt.OptionParser[BatchPredictConfig]("BatchPredict") { + opt[String]("input") action { (x, c) => + c.copy(inputFilePath = x) + } text("Path to file containing input queries; a " + + "multi-object JSON file with one object per line.") + opt[String]("output") action { (x, c) => + c.copy(outputFilePath = x) + } text("Path to file containing output predictions; a " + + "multi-object JSON file with one object per line.") + opt[Int]("query-partitions") action { (x, c) => + c.copy(queryPartitions = Some(x)) + } text("Limit concurrency of predictions by setting the number " + + "of partitions used internally for the RDD of queries.") + opt[String]("engineId") action { (x, c) => + c.copy(engineId = Some(x)) + } text("Engine ID.") + opt[String]("engineId") action { (x, c) => + c.copy(engineId = Some(x)) + } text("Engine ID.") + opt[String]("engineVersion") action { (x, c) => + c.copy(engineVersion = Some(x)) + } text("Engine version.") + opt[String]("engine-variant") required() action { (x, c) => + c.copy(engineVariant = x) + } text("Engine variant JSON.") + opt[String]("env") action { (x, c) => + c.copy(env = Some(x)) + } text("Comma-separated list of environmental variables (in 'FOO=BAR' " + + "format) to pass to the Spark execution environment.") + opt[String]("engineInstanceId") required() action { (x, c) => + c.copy(engineInstanceId = x) + } text("Engine instance ID.") + opt[Unit]("verbose") action { (x, c) => + c.copy(verbose = true) + } text("Enable verbose output.") + opt[Unit]("debug") action { (x, c) => + c.copy(debug = true) + } text("Enable debug output.") + opt[String]("json-extractor") action { (x, c) => + c.copy(jsonExtractor = JsonExtractorOption.withName(x)) + } + } + + parser.parse(args, BatchPredictConfig()) map { config => + WorkflowUtils.modifyLogging(config.verbose) + engineInstances.get(config.engineInstanceId) map { engineInstance => + + val engine = getEngine(engineInstance) + + run(config, engineInstance, engine) + + } getOrElse { + error(s"Invalid engine instance ID. Aborting batch predict.") + } + } + } + + def getEngine(engineInstance: EngineInstance): Engine[_, _, _, _, _, _] = { + + val engineFactoryName = engineInstance.engineFactory + + val (engineLanguage, engineFactory) = + WorkflowUtils.getEngine(engineFactoryName, getClass.getClassLoader) + val maybeEngine = engineFactory() + + // EngineFactory return a base engine, which may not be deployable. + if (!maybeEngine.isInstanceOf[Engine[_,_,_,_,_,_]]) { + throw new NoSuchMethodException( + s"Engine $maybeEngine cannot be used for batch predict") + } + + maybeEngine.asInstanceOf[Engine[_,_,_,_,_,_]] + } + + def run[Q, P]( + config: BatchPredictConfig, + engineInstance: EngineInstance, + engine: Engine[_, _, _, Q, P, _]): Unit = { + + val engineParams = engine.engineInstanceToEngineParams( + engineInstance, config.jsonExtractor) + + val kryo = KryoInstantiator.newKryoInjection + + val modelsFromEngineInstance = + kryo.invert(modeldata.get(engineInstance.id).get.models).get. + asInstanceOf[Seq[Any]] + + val prepareSparkContext = WorkflowContext( + batch = engineInstance.engineFactory, + executorEnv = engineInstance.env, + mode = "Batch Predict (model)", + sparkEnv = engineInstance.sparkConf) + + val models = engine.prepareDeploy( + prepareSparkContext, + engineParams, + engineInstance.id, + modelsFromEngineInstance, + params = WorkflowParams() + ) + + val algorithms = engineParams.algorithmParamsList.map { case (n, p) => + Doer(engine.algorithmClassMap(n), p) + } + + val servingParamsWithName = engineParams.servingParams + + val serving = Doer(engine.servingClassMap(servingParamsWithName._1), + servingParamsWithName._2) + + val runSparkContext = WorkflowContext( + batch = engineInstance.engineFactory, + executorEnv = engineInstance.env, + mode = "Batch Predict (runner)", + sparkEnv = engineInstance.sparkConf) + + val inputRDD: RDD[String] = runSparkContext. + textFile(config.inputFilePath). + filter(_.trim.nonEmpty) + val queriesRDD: RDD[String] = config.queryPartitions match { + case Some(p) => inputRDD.repartition(p) + case None => inputRDD + } + + val predictionsRDD: RDD[String] = queriesRDD.map { queryString => + val jsonExtractorOption = config.jsonExtractor + // Extract Query from Json + val query = JsonExtractor.extract( + jsonExtractorOption, + queryString, + algorithms.head.queryClass, + algorithms.head.querySerializer, + algorithms.head.gsonTypeAdapterFactories + ) + // Deploy logic. First call Serving.supplement, then Algo.predict, + // finally Serving.serve. + val supplementedQuery = serving.supplementBase(query) + // TODO: Parallelize the following. + val predictions = algorithms.zipWithIndex.map { case (a, ai) => + a.predictBase(models(ai), supplementedQuery) + } + // Notice that it is by design to call Serving.serve with the + // *original* query. + val prediction = serving.serveBase(query, predictions) + // Combine query with prediction, so the batch results are + // self-descriptive. + val predictionJValue = JsonExtractor.toJValue( + jsonExtractorOption, + Map("query" -> query, + "prediction" -> prediction), + algorithms.head.querySerializer, + algorithms.head.gsonTypeAdapterFactories) + // Return JSON string + compact(render(predictionJValue)) + } + + predictionsRDD.saveAsTextFile(config.outputFilePath) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala b/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala new file mode 100644 index 0000000..35572c9 --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.tools + +import org.apache.predictionio.tools.Common._ +import org.apache.predictionio.tools.ReturnTypes._ +import org.apache.predictionio.workflow.JsonExtractorOption +import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption + +import java.io.File +import grizzled.slf4j.Logging + +import scala.sys.process._ + +case class BatchPredictArgs( + inputFilePath: String = "batchpredict-input.json", + outputFilePath: String = "batchpredict-output.json", + queryPartitions: Option[Int] = None, + variantJson: Option[File] = None, + jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both) + + +object RunBatchPredict extends Logging { + + def runBatchPredict( + engineInstanceId: String, + batchPredictArgs: BatchPredictArgs, + sparkArgs: SparkArgs, + pioHome: String, + engineDirPath: String, + verbose: Boolean = false): Expected[(Process, () => Unit)] = { + + val jarFiles = jarFilesForScala(engineDirPath).map(_.toURI) ++ + Option(new File(pioHome, "plugins").listFiles()) + .getOrElse(Array.empty[File]).map(_.toURI) + val args = Seq[String]( + "--input", + batchPredictArgs.inputFilePath, + "--output", + batchPredictArgs.outputFilePath, + "--engineInstanceId", + engineInstanceId, + "--engine-variant", + batchPredictArgs.variantJson.getOrElse( + new File(engineDirPath, "engine.json")).getCanonicalPath) ++ + (if (batchPredictArgs.queryPartitions.isEmpty) Seq() + else Seq("--query-partitions", + batchPredictArgs.queryPartitions.get.toString)) ++ + (if (verbose) Seq("--verbose") else Seq()) ++ + Seq("--json-extractor", batchPredictArgs.jsonExtractor.toString) + + Runner.runOnSpark( + "org.apache.predictionio.workflow.BatchPredict", + args, sparkArgs, jarFiles, pioHome, verbose) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala b/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala index e49c3fc..e3460a5 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala @@ -21,8 +21,9 @@ import org.apache.predictionio.core.BuildInfo import org.apache.predictionio.controller.Utils import org.apache.predictionio.data.storage import org.apache.predictionio.tools.EitherLogging -import org.apache.predictionio.tools.{RunWorkflow, RunServer} -import org.apache.predictionio.tools.{DeployArgs, WorkflowArgs, SparkArgs, ServerArgs} +import org.apache.predictionio.tools.{RunWorkflow, RunServer, RunBatchPredict} +import org.apache.predictionio.tools.{ + DeployArgs, WorkflowArgs, SparkArgs, ServerArgs, BatchPredictArgs} import org.apache.predictionio.tools.console.Console import org.apache.predictionio.tools.Common._ import org.apache.predictionio.tools.ReturnTypes._ @@ -262,6 +263,56 @@ object Engine extends EitherLogging { } } + /** Batch predict with an engine. + * + * @param ea An instance of [[EngineArgs]] + * @param engineInstanceId An instance of [[engineInstanceId]] + * @param batchPredictArgs An instance of [[BatchPredictArgs]] + * @param sparkArgs An instance of [[SparkArgs]] + * @param pioHome [[String]] with a path to PIO installation + * @param verbose A [[Boolean]] + * @return An instance of [[Expected]] contaning either [[Left]] + * with an error message or [[Right]] with a handle to process + * of a running angine and a function () => Unit, + * that must be called when the process is complete + */ + def batchPredict( + ea: EngineArgs, + engineInstanceId: Option[String], + batchPredictArgs: BatchPredictArgs, + sparkArgs: SparkArgs, + pioHome: String, + verbose: Boolean = false): Expected[(Process, () => Unit)] = { + + val engineDirPath = getEngineDirPath(ea.engineDir) + val verifyResult = Template.verifyTemplateMinVersion( + new File(engineDirPath, "template.json")) + if (verifyResult.isLeft) { + return Left(verifyResult.left.get) + } + val ei = Console.getEngineInfo( + batchPredictArgs.variantJson.getOrElse(new File(engineDirPath, "engine.json")), + engineDirPath) + val engineInstances = storage.Storage.getMetaDataEngineInstances + val engineInstance = engineInstanceId map { eid => + engineInstances.get(eid) + } getOrElse { + engineInstances.getLatestCompleted( + ei.engineId, ei.engineVersion, ei.variantId) + } + engineInstance map { r => + RunBatchPredict.runBatchPredict( + r.id, batchPredictArgs, sparkArgs, pioHome, engineDirPath, verbose) + } getOrElse { + engineInstanceId map { eid => + logAndFail(s"Invalid engine instance ID ${eid}. Aborting.") + } getOrElse { + logAndFail(s"No valid engine instance found for engine ${ei.engineId} " + + s"${ei.engineVersion}.\nTry running 'train' before 'batchpredict'. Aborting.") + } + } + } + /** Running a driver on spark. * The function starts a process and returns immediately * http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala index 535905a..4a72635 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala @@ -27,7 +27,8 @@ import org.apache.predictionio.tools.commands.{ DashboardArgs, AdminServerArgs, ImportArgs, ExportArgs, BuildArgs, EngineArgs} import org.apache.predictionio.tools.{ - EventServerArgs, SparkArgs, WorkflowArgs, ServerArgs, DeployArgs} + EventServerArgs, SparkArgs, WorkflowArgs, ServerArgs, + DeployArgs, BatchPredictArgs} import org.apache.predictionio.workflow.{JsonExtractorOption, WorkflowUtils} import org.json4s._ import org.json4s.native.JsonMethods._ @@ -42,6 +43,7 @@ case class ConsoleArgs( workflow: WorkflowArgs = WorkflowArgs(), accessKey: AccessKeyArgs = AccessKeyArgs(), deploy: DeployArgs = DeployArgs(), + batchPredict: BatchPredictArgs = BatchPredictArgs(), eventServer: EventServerArgs = EventServerArgs(), adminServer: AdminServerArgs = AdminServerArgs(), dashboard: DashboardArgs = DashboardArgs(), @@ -323,6 +325,46 @@ object Console extends Logging { } text("Port to unbind from. Default: 8000") ) note("") + cmd("batchpredict"). + text("Use an engine instance to process batch predictions. This\n" + + "command will pass all pass-through arguments to its underlying\n" + + "spark-submit command. All algorithm classes used in the engine\n" + + "must be serializable."). + action { (_, c) => + c.copy(commands = c.commands :+ "batchpredict") + } children( + opt[String]("input") action { (x, c) => + c.copy(batchPredict = c.batchPredict.copy(inputFilePath = x)) + } text("Path to file containing queries; a multi-object JSON file\n" + + "with one query object per line. Accepts any valid Hadoop\n" + + "file URL. Default: batchpredict-input.json"), + opt[String]("output") action { (x, c) => + c.copy(batchPredict = c.batchPredict.copy(outputFilePath = x)) + } text("Path to file to receive results; a multi-object JSON file\n" + + "with one object per line, the prediction + original query.\n" + + "Accepts any valid Hadoop file URL. Actual output will be\n" + + "written as Hadoop partition files in a directory with the\n" + + "output name. Default: batchpredict-output.json"), + opt[Int]("query-partitions") action { (x, c) => + c.copy(batchPredict = c.batchPredict.copy(queryPartitions = Some(x))) + } text("Limit concurrency of predictions by setting the number\n" + + "of partitions used internally for the RDD of queries.\n" + + "Default: number created by Spark context's `textFile`"), + opt[String]("engine-instance-id") action { (x, c) => + c.copy(engineInstanceId = Some(x)) + } text("Engine instance ID."), + opt[String]("json-extractor") action { (x, c) => + c.copy(workflow = c.workflow.copy(jsonExtractor = JsonExtractorOption.withName(x))) + } validate { x => + if (JsonExtractorOption.values.map(_.toString).contains(x)) { + success + } else { + val validOptions = JsonExtractorOption.values.mkString("|") + failure(s"$x is not a valid json-extractor option [$validOptions]") + } + } + ) + note("") cmd("dashboard"). text("Launch a dashboard at the specific IP and port."). action { (_, c) => @@ -644,6 +686,19 @@ object Console extends Logging { ca.verbose) case Seq("undeploy") => Pio.undeploy(ca.deploy) + case Seq("batchpredict") => + Pio.batchPredict( + ca.engine, + ca.engineInstanceId, + BatchPredictArgs( + ca.batchPredict.inputFilePath, + ca.batchPredict.outputFilePath, + ca.batchPredict.queryPartitions, + ca.workflow.variantJson, + ca.workflow.jsonExtractor), + ca.spark, + ca.pioHome.get, + ca.verbose) case Seq("dashboard") => Pio.dashboard(ca.dashboard) case Seq("eventserver") => @@ -756,6 +811,7 @@ object Console extends Logging { "build" -> txt.build().toString, "train" -> txt.train().toString, "deploy" -> txt.deploy().toString, + "batchpredict" -> txt.batchpredict().toString, "eventserver" -> txt.eventserver().toString, "adminserver" -> txt.adminserver().toString, "app" -> txt.app().toString, http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala index dd78717..ef4581b 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala @@ -18,7 +18,8 @@ package org.apache.predictionio.tools.console import org.apache.predictionio.tools.{ - EventServerArgs, SparkArgs, WorkflowArgs, ServerArgs, DeployArgs} + EventServerArgs, SparkArgs, WorkflowArgs, ServerArgs, + DeployArgs, BatchPredictArgs} import org.apache.predictionio.tools.commands.{ DashboardArgs, AdminServerArgs, ImportArgs, ExportArgs, BuildArgs, EngineArgs, Management, Engine, Import, Export, @@ -104,6 +105,16 @@ object Pio extends Logging { def undeploy(da: DeployArgs): Int = Engine.undeploy(da) + def batchPredict( + ea: EngineArgs, + engineInstanceId: Option[String], + batchPredictArgs: BatchPredictArgs, + sparkArgs: SparkArgs, + pioHome: String, + verbose: Boolean = false): Int = + processAwaitAndClean(Engine.batchPredict( + ea, engineInstanceId, batchPredictArgs, sparkArgs, pioHome, verbose)) + def dashboard(da: DashboardArgs): Int = { Management.dashboard(da).awaitTermination 0 http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/tools/src/main/twirl/org/apache/predictionio/tools/console/batchpredict.scala.txt ---------------------------------------------------------------------- diff --git a/tools/src/main/twirl/org/apache/predictionio/tools/console/batchpredict.scala.txt b/tools/src/main/twirl/org/apache/predictionio/tools/console/batchpredict.scala.txt new file mode 100644 index 0000000..d9d5d74 --- /dev/null +++ b/tools/src/main/twirl/org/apache/predictionio/tools/console/batchpredict.scala.txt @@ -0,0 +1,25 @@ +Usage: pio batchpredict [--input <value>] + [--output <value>] + [--query-partitions <value>] + [--engine-instance-id <value>] + +Use an engine instance to process batch predictions. This command will pass all +pass-through arguments to its underlying spark-submit command. All algorithm +classes used in the engine must be serializable. + + --input <value> + Path to file containing queries; a multi-object JSON file with one + query object per line. Accepts any valid Hadoop file URL. + Default: batchpredict-input.json + --output <value> + Path to file to receive results; a multi-object JSON file with one + object per line, the prediction + original query. Accepts any + valid Hadoop file URL. Actual output will be written as Hadoop + partition files in a directory with the output name. + Default: batchpredict-output.json + --query-partitions <value> + Limit concurrency of predictions by setting the number of partitions + used internally for the RDD of queries. + Default: number created by Spark context's `textFile` + --engine-instance-id <value> + Engine instance ID. Default: the latest trained instance. http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/cfa3f5da/tools/src/main/twirl/org/apache/predictionio/tools/console/main.scala.txt ---------------------------------------------------------------------- diff --git a/tools/src/main/twirl/org/apache/predictionio/tools/console/main.scala.txt b/tools/src/main/twirl/org/apache/predictionio/tools/console/main.scala.txt index 5efa4bf..01be96d 100644 --- a/tools/src/main/twirl/org/apache/predictionio/tools/console/main.scala.txt +++ b/tools/src/main/twirl/org/apache/predictionio/tools/console/main.scala.txt @@ -38,6 +38,7 @@ The most commonly used pio commands are: build Build an engine at the current directory train Kick off a training using an engine deploy Deploy an engine as an engine server + batchpredict Process bulk predictions with an engine eventserver Launch an Event Server app Manage apps that are used by the Event Server accesskey Manage app access keys
