http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala new file mode 100644 index 0000000..87aac07 --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala @@ -0,0 +1,1277 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools.console + +import java.io.File +import java.net.URI + +import grizzled.slf4j.Logging +import org.apache.predictionio.controller.Utils +import org.apache.predictionio.core.BuildInfo +import org.apache.predictionio.data.api.EventServer +import org.apache.predictionio.data.api.EventServerConfig +import org.apache.predictionio.data.storage +import org.apache.predictionio.data.storage.EngineManifest +import org.apache.predictionio.data.storage.EngineManifestSerializer +import org.apache.predictionio.data.storage.hbase.upgrade.Upgrade_0_8_3 +import org.apache.predictionio.tools.RegisterEngine +import org.apache.predictionio.tools.RunServer +import org.apache.predictionio.tools.RunWorkflow +import org.apache.predictionio.tools.admin.AdminServer +import org.apache.predictionio.tools.admin.AdminServerConfig +import org.apache.predictionio.tools.dashboard.Dashboard +import org.apache.predictionio.tools.dashboard.DashboardConfig +import org.apache.predictionio.workflow.JsonExtractorOption +import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption +import org.apache.predictionio.workflow.WorkflowUtils +import org.apache.commons.io.FileUtils +import org.json4s._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write +import semverfi._ + +import scala.collection.JavaConversions._ +import scala.io.Source +import scala.sys.process._ +import scala.util.Random +import scalaj.http.Http + +case class ConsoleArgs( + common: CommonArgs = CommonArgs(), + build: BuildArgs = BuildArgs(), + app: AppArgs = AppArgs(), + accessKey: AccessKeyArgs = AccessKeyArgs(), + deploy: DeployArgs = DeployArgs(), + eventServer: EventServerArgs = EventServerArgs(), + adminServer: AdminServerArgs = AdminServerArgs(), + dashboard: DashboardArgs = DashboardArgs(), + upgrade: UpgradeArgs = UpgradeArgs(), + template: TemplateArgs = TemplateArgs(), + export: ExportArgs = ExportArgs(), + imprt: ImportArgs = ImportArgs(), + commands: Seq[String] = Seq(), + metricsClass: Option[String] = None, + metricsParamsJsonPath: Option[String] = None, + paramsPath: String = "params", + engineInstanceId: Option[String] = None, + mainClass: Option[String] = None) + +case class CommonArgs( + batch: String = "", + sparkPassThrough: Seq[String] = Seq(), + driverPassThrough: Seq[String] = Seq(), + pioHome: Option[String] = None, + sparkHome: Option[String] = None, + engineId: Option[String] = None, + engineVersion: Option[String] = None, + engineFactory: Option[String] = None, + engineParamsKey: Option[String] = None, + evaluation: Option[String] = None, + engineParamsGenerator: Option[String] = None, + variantJson: File = new File("engine.json"), + manifestJson: File = new File("manifest.json"), + stopAfterRead: Boolean = false, + stopAfterPrepare: Boolean = false, + skipSanityCheck: Boolean = false, + verbose: Boolean = false, + verbosity: Int = 0, + sparkKryo: Boolean = false, + scratchUri: Option[URI] = None, + jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both) + +case class BuildArgs( + sbt: Option[File] = None, + sbtExtra: Option[String] = None, + sbtAssemblyPackageDependency: Boolean = true, + sbtClean: Boolean = false, + uberJar: Boolean = false, + forceGeneratePIOSbt: Boolean = false) + +case class DeployArgs( + ip: String = "0.0.0.0", + port: Int = 8000, + logUrl: Option[String] = None, + logPrefix: Option[String] = None) + +case class EventServerArgs( + enabled: Boolean = false, + ip: String = "0.0.0.0", + port: Int = 7070, + stats: Boolean = false) + +case class AdminServerArgs( +ip: String = "127.0.0.1", +port: Int = 7071) + +case class DashboardArgs( + ip: String = "127.0.0.1", + port: Int = 9000) + +case class UpgradeArgs( + from: String = "0.0.0", + to: String = "0.0.0", + oldAppId: Int = 0, + newAppId: Int = 0 +) + +object Console extends Logging { + def main(args: Array[String]): Unit = { + val parser = new scopt.OptionParser[ConsoleArgs]("pio") { + override def showUsageOnError: Boolean = false + head("PredictionIO Command Line Interface Console", BuildInfo.version) + help("") + note("Note that it is possible to supply pass-through arguments at\n" + + "the end of the command by using a '--' separator, e.g.\n\n" + + "pio train --params-path params -- --master spark://mycluster:7077\n" + + "\nIn the example above, the '--master' argument will be passed to\n" + + "underlying spark-submit command. Please refer to the usage section\n" + + "for each command for more information.\n\n" + + "The following options are common to all commands:\n") + opt[String]("pio-home") action { (x, c) => + c.copy(common = c.common.copy(pioHome = Some(x))) + } text("Root directory of a PredictionIO installation.\n" + + " Specify this if automatic discovery fail.") + opt[String]("spark-home") action { (x, c) => + c.copy(common = c.common.copy(sparkHome = Some(x))) + } text("Root directory of an Apache Spark installation.\n" + + " If not specified, will try to use the SPARK_HOME\n" + + " environmental variable. If this fails as well, default to\n" + + " current directory.") + opt[String]("engine-id") abbr("ei") action { (x, c) => + c.copy(common = c.common.copy(engineId = Some(x))) + } text("Specify an engine ID. Usually used by distributed deployment.") + opt[String]("engine-version") abbr("ev") action { (x, c) => + c.copy(common = c.common.copy(engineVersion = Some(x))) + } text("Specify an engine version. Usually used by distributed " + + "deployment.") + opt[File]("variant") abbr("v") action { (x, c) => + c.copy(common = c.common.copy(variantJson = x)) + } + opt[File]("manifest") abbr("m") action { (x, c) => + c.copy(common = c.common.copy(manifestJson = x)) + } + opt[File]("sbt") action { (x, c) => + c.copy(build = c.build.copy(sbt = Some(x))) + } validate { x => + if (x.exists) { + success + } else { + failure(s"${x.getCanonicalPath} does not exist.") + } + } text("Path to sbt. Default: sbt") + opt[Unit]("verbose") action { (x, c) => + c.copy(common = c.common.copy(verbose = true)) + } + opt[Unit]("spark-kryo") abbr("sk") action { (x, c) => + c.copy(common = c.common.copy(sparkKryo = true)) + } + opt[String]("scratch-uri") action { (x, c) => + c.copy(common = c.common.copy(scratchUri = Some(new URI(x)))) + } + note("") + cmd("version"). + text("Displays the version of this command line console."). + action { (_, c) => + c.copy(commands = c.commands :+ "version") + } + note("") + cmd("help").action { (_, c) => + c.copy(commands = c.commands :+ "help") + } children( + arg[String]("<command>") optional() + action { (x, c) => + c.copy(commands = c.commands :+ x) + } + ) + note("") + cmd("build"). + text("Build an engine at the current directory."). + action { (_, c) => + c.copy(commands = c.commands :+ "build") + } children( + opt[String]("sbt-extra") action { (x, c) => + c.copy(build = c.build.copy(sbtExtra = Some(x))) + } text("Extra command to pass to SBT when it builds your engine."), + opt[Unit]("clean") action { (x, c) => + c.copy(build = c.build.copy(sbtClean = true)) + } text("Clean build."), + opt[Unit]("no-asm") action { (x, c) => + c.copy(build = c.build.copy(sbtAssemblyPackageDependency = false)) + } text("Skip building external dependencies assembly."), + opt[Unit]("uber-jar") action { (x, c) => + c.copy(build = c.build.copy(uberJar = true)) + }, + opt[Unit]("generate-pio-sbt") action { (x, c) => + c.copy(build = c.build.copy(forceGeneratePIOSbt = true)) + } + ) + note("") + cmd("unregister"). + text("Unregister an engine at the current directory."). + action { (_, c) => + c.copy(commands = c.commands :+ "unregister") + } + note("") + cmd("train"). + text("Kick off a training using an engine. This will produce an\n" + + "engine instance. This command will pass all pass-through\n" + + "arguments to its underlying spark-submit command."). + action { (_, c) => + c.copy(commands = c.commands :+ "train") + } children( + opt[String]("batch") action { (x, c) => + c.copy(common = c.common.copy(batch = x)) + } text("Batch label of the run."), + opt[String]("params-path") action { (x, c) => + c.copy(paramsPath = x) + } text("Directory to lookup parameters JSON files. Default: params"), + opt[String]("metrics-params") abbr("mp") action { (x, c) => + c.copy(metricsParamsJsonPath = Some(x)) + } text("Metrics parameters JSON file. Will try to use\n" + + " metrics.json in the base path."), + opt[Unit]("skip-sanity-check") abbr("ssc") action { (x, c) => + c.copy(common = c.common.copy(skipSanityCheck = true)) + }, + opt[Unit]("stop-after-read") abbr("sar") action { (x, c) => + c.copy(common = c.common.copy(stopAfterRead = true)) + }, + opt[Unit]("stop-after-prepare") abbr("sap") action { (x, c) => + c.copy(common = c.common.copy(stopAfterPrepare = true)) + }, + opt[Unit]("uber-jar") action { (x, c) => + c.copy(build = c.build.copy(uberJar = true)) + }, + opt[Int]("verbosity") action { (x, c) => + c.copy(common = c.common.copy(verbosity = x)) + }, + opt[String]("engine-factory") action { (x, c) => + c.copy(common = c.common.copy(engineFactory = Some(x))) + }, + opt[String]("engine-params-key") action { (x, c) => + c.copy(common = c.common.copy(engineParamsKey = Some(x))) + }, + opt[String]("json-extractor") action { (x, c) => + c.copy(common = c.common.copy(jsonExtractor = JsonExtractorOption.withName(x))) + } validate { x => + if (JsonExtractorOption.values.map(_.toString).contains(x)) { + success + } else { + val validOptions = JsonExtractorOption.values.mkString("|") + failure(s"$x is not a valid json-extractor option [$validOptions]") + } + } + ) + note("") + cmd("eval"). + text("Kick off an evaluation using an engine. This will produce an\n" + + "engine instance. This command will pass all pass-through\n" + + "arguments to its underlying spark-submit command."). + action { (_, c) => + c.copy(commands = c.commands :+ "eval") + } children( + arg[String]("<evaluation-class>") action { (x, c) => + c.copy(common = c.common.copy(evaluation = Some(x))) + }, + arg[String]("[<engine-parameters-generator-class>]") optional() action { (x, c) => + c.copy(common = c.common.copy(engineParamsGenerator = Some(x))) + } text("Optional engine parameters generator class, overriding the first argument"), + opt[String]("batch") action { (x, c) => + c.copy(common = c.common.copy(batch = x)) + } text("Batch label of the run."), + opt[String]("json-extractor") action { (x, c) => + c.copy(common = c.common.copy(jsonExtractor = JsonExtractorOption.withName(x))) + } validate { x => + if (JsonExtractorOption.values.map(_.toString).contains(x)) { + success + } else { + val validOptions = JsonExtractorOption.values.mkString("|") + failure(s"$x is not a valid json-extractor option [$validOptions]") + } + } + ) + note("") + cmd("deploy"). + text("Deploy an engine instance as a prediction server. This\n" + + "command will pass all pass-through arguments to its underlying\n" + + "spark-submit command."). + action { (_, c) => + c.copy(commands = c.commands :+ "deploy") + } children( + opt[String]("batch") action { (x, c) => + c.copy(common = c.common.copy(batch = x)) + } text("Batch label of the deployment."), + opt[String]("engine-instance-id") action { (x, c) => + c.copy(engineInstanceId = Some(x)) + } text("Engine instance ID."), + opt[String]("ip") action { (x, c) => + c.copy(deploy = c.deploy.copy(ip = x)) + }, + opt[Int]("port") action { (x, c) => + c.copy(deploy = c.deploy.copy(port = x)) + } text("Port to bind to. Default: 8000"), + opt[Unit]("feedback") action { (_, c) => + c.copy(eventServer = c.eventServer.copy(enabled = true)) + } text("Enable feedback loop to event server."), + opt[String]("event-server-ip") action { (x, c) => + c.copy(eventServer = c.eventServer.copy(ip = x)) + }, + opt[Int]("event-server-port") action { (x, c) => + c.copy(eventServer = c.eventServer.copy(port = x)) + } text("Event server port. Default: 7070"), + opt[Int]("admin-server-port") action { (x, c) => + c.copy(adminServer = c.adminServer.copy(port = x)) + } text("Admin server port. Default: 7071"), + opt[String]("admin-server-port") action { (x, c) => + c.copy(adminServer = c.adminServer.copy(ip = x)) + } text("Admin server IP. Default: localhost"), + opt[String]("accesskey") action { (x, c) => + c.copy(accessKey = c.accessKey.copy(accessKey = x)) + } text("Access key of the App where feedback data will be stored."), + opt[Unit]("uber-jar") action { (x, c) => + c.copy(build = c.build.copy(uberJar = true)) + }, + opt[String]("log-url") action { (x, c) => + c.copy(deploy = c.deploy.copy(logUrl = Some(x))) + }, + opt[String]("log-prefix") action { (x, c) => + c.copy(deploy = c.deploy.copy(logPrefix = Some(x))) + }, + opt[String]("json-extractor") action { (x, c) => + c.copy(common = c.common.copy(jsonExtractor = JsonExtractorOption.withName(x))) + } validate { x => + if (JsonExtractorOption.values.map(_.toString).contains(x)) { + success + } else { + val validOptions = JsonExtractorOption.values.mkString("|") + failure(s"$x is not a valid json-extractor option [$validOptions]") + } + } + ) + note("") + cmd("undeploy"). + text("Undeploy an engine instance as a prediction server."). + action { (_, c) => + c.copy(commands = c.commands :+ "undeploy") + } children( + opt[String]("ip") action { (x, c) => + c.copy(deploy = c.deploy.copy(ip = x)) + }, + opt[Int]("port") action { (x, c) => + c.copy(deploy = c.deploy.copy(port = x)) + } text("Port to unbind from. Default: 8000") + ) + note("") + cmd("dashboard"). + text("Launch a dashboard at the specific IP and port."). + action { (_, c) => + c.copy(commands = c.commands :+ "dashboard") + } children( + opt[String]("ip") action { (x, c) => + c.copy(dashboard = c.dashboard.copy(ip = x)) + }, + opt[Int]("port") action { (x, c) => + c.copy(dashboard = c.dashboard.copy(port = x)) + } text("Port to bind to. Default: 9000") + ) + note("") + cmd("eventserver"). + text("Launch an Event Server at the specific IP and port."). + action { (_, c) => + c.copy(commands = c.commands :+ "eventserver") + } children( + opt[String]("ip") action { (x, c) => + c.copy(eventServer = c.eventServer.copy(ip = x)) + }, + opt[Int]("port") action { (x, c) => + c.copy(eventServer = c.eventServer.copy(port = x)) + } text("Port to bind to. Default: 7070"), + opt[Unit]("stats") action { (x, c) => + c.copy(eventServer = c.eventServer.copy(stats = true)) + } + ) + cmd("adminserver"). + text("Launch an Admin Server at the specific IP and port."). + action { (_, c) => + c.copy(commands = c.commands :+ "adminserver") + } children( + opt[String]("ip") action { (x, c) => + c.copy(adminServer = c.adminServer.copy(ip = x)) + } text("IP to bind to. Default: localhost"), + opt[Int]("port") action { (x, c) => + c.copy(adminServer = c.adminServer.copy(port = x)) + } text("Port to bind to. Default: 7071") + ) + note("") + cmd("run"). + text("Launch a driver program. This command will pass all\n" + + "pass-through arguments to its underlying spark-submit command.\n" + + "In addition, it also supports a second level of pass-through\n" + + "arguments to the driver program, e.g.\n" + + "pio run -- --master spark://localhost:7077 -- --driver-arg foo"). + action { (_, c) => + c.copy(commands = c.commands :+ "run") + } children( + arg[String]("<main class>") action { (x, c) => + c.copy(mainClass = Some(x)) + } text("Main class name of the driver program."), + opt[String]("sbt-extra") action { (x, c) => + c.copy(build = c.build.copy(sbtExtra = Some(x))) + } text("Extra command to pass to SBT when it builds your engine."), + opt[Unit]("clean") action { (x, c) => + c.copy(build = c.build.copy(sbtClean = true)) + } text("Clean build."), + opt[Unit]("no-asm") action { (x, c) => + c.copy(build = c.build.copy(sbtAssemblyPackageDependency = false)) + } text("Skip building external dependencies assembly.") + ) + note("") + cmd("status"). + text("Displays status information about the PredictionIO system."). + action { (_, c) => + c.copy(commands = c.commands :+ "status") + } + note("") + cmd("upgrade"). + text("Upgrade tool"). + action { (_, c) => + c.copy(commands = c.commands :+ "upgrade") + } children( + arg[String]("<from version>") action { (x, c) => + c.copy(upgrade = c.upgrade.copy(from = x)) + } text("The version upgraded from."), + arg[String]("<to version>") action { (x, c) => + c.copy(upgrade = c.upgrade.copy(to = x)) + } text("The version upgraded to."), + arg[Int]("<old App ID>") action { (x, c) => + c.copy(upgrade = c.upgrade.copy(oldAppId = x)) + } text("Old App ID."), + arg[Int]("<new App ID>") action { (x, c) => + c.copy(upgrade = c.upgrade.copy(newAppId = x)) + } text("New App ID.") + ) + note("") + cmd("app"). + text("Manage apps.\n"). + action { (_, c) => + c.copy(commands = c.commands :+ "app") + } children( + cmd("new"). + text("Create a new app key to app ID mapping."). + action { (_, c) => + c.copy(commands = c.commands :+ "new") + } children( + opt[Int]("id") action { (x, c) => + c.copy(app = c.app.copy(id = Some(x))) + }, + opt[String]("description") action { (x, c) => + c.copy(app = c.app.copy(description = Some(x))) + }, + opt[String]("access-key") action { (x, c) => + c.copy(accessKey = c.accessKey.copy(accessKey = x)) + }, + arg[String]("<name>") action { (x, c) => + c.copy(app = c.app.copy(name = x)) + } + ), + note(""), + cmd("list"). + text("List all apps."). + action { (_, c) => + c.copy(commands = c.commands :+ "list") + }, + note(""), + cmd("show"). + text("Show details of an app."). + action { (_, c) => + c.copy(commands = c.commands :+ "show") + } children ( + arg[String]("<name>") action { (x, c) => + c.copy(app = c.app.copy(name = x)) + } text("Name of the app to be shown.") + ), + note(""), + cmd("delete"). + text("Delete an app."). + action { (_, c) => + c.copy(commands = c.commands :+ "delete") + } children( + arg[String]("<name>") action { (x, c) => + c.copy(app = c.app.copy(name = x)) + } text("Name of the app to be deleted."), + opt[Unit]("force") abbr("f") action { (x, c) => + c.copy(app = c.app.copy(force = true)) + } text("Delete an app without prompting for confirmation") + ), + note(""), + cmd("data-delete"). + text("Delete data of an app"). + action { (_, c) => + c.copy(commands = c.commands :+ "data-delete") + } children( + arg[String]("<name>") action { (x, c) => + c.copy(app = c.app.copy(name = x)) + } text("Name of the app whose data to be deleted."), + opt[String]("channel") action { (x, c) => + c.copy(app = c.app.copy(dataDeleteChannel = Some(x))) + } text("Name of channel whose data to be deleted."), + opt[Unit]("all") action { (x, c) => + c.copy(app = c.app.copy(all = true)) + } text("Delete data of all channels including default"), + opt[Unit]("force") abbr("f") action { (x, c) => + c.copy(app = c.app.copy(force = true)) + } text("Delete data of an app without prompting for confirmation") + ), + note(""), + cmd("channel-new"). + text("Create a new channel for the app."). + action { (_, c) => + c.copy(commands = c.commands :+ "channel-new") + } children ( + arg[String]("<name>") action { (x, c) => + c.copy(app = c.app.copy(name = x)) + } text("App name."), + arg[String]("<channel>") action { (x, c) => + c.copy(app = c.app.copy(channel = x)) + } text ("Channel name to be created.") + ), + note(""), + cmd("channel-delete"). + text("Delete a channel of the app."). + action { (_, c) => + c.copy(commands = c.commands :+ "channel-delete") + } children ( + arg[String]("<name>") action { (x, c) => + c.copy(app = c.app.copy(name = x)) + } text("App name."), + arg[String]("<channel>") action { (x, c) => + c.copy(app = c.app.copy(channel = x)) + } text ("Channel name to be deleted."), + opt[Unit]("force") abbr("f") action { (x, c) => + c.copy(app = c.app.copy(force = true)) + } text("Delete a channel of the app without prompting for confirmation") + ) + ) + note("") + cmd("accesskey"). + text("Manage app access keys.\n"). + action { (_, c) => + c.copy(commands = c.commands :+ "accesskey") + } children( + cmd("new"). + text("Add allowed event(s) to an access key."). + action { (_, c) => + c.copy(commands = c.commands :+ "new") + } children( + opt[String]("key") action { (x, c) => + c.copy(accessKey = c.accessKey.copy(accessKey = x)) + }, + arg[String]("<app name>") action { (x, c) => + c.copy(app = c.app.copy(name = x)) + }, + arg[String]("[<event1> <event2> ...]") unbounded() optional() + action { (x, c) => + c.copy(accessKey = c.accessKey.copy( + events = c.accessKey.events :+ x)) + } + ), + cmd("list"). + text("List all access keys of an app."). + action { (_, c) => + c.copy(commands = c.commands :+ "list") + } children( + arg[String]("<app name>") optional() action { (x, c) => + c.copy(app = c.app.copy(name = x)) + } text("App name.") + ), + note(""), + cmd("delete"). + text("Delete an access key."). + action { (_, c) => + c.copy(commands = c.commands :+ "delete") + } children( + arg[String]("<access key>") action { (x, c) => + c.copy(accessKey = c.accessKey.copy(accessKey = x)) + } text("The access key to be deleted.") + ) + ) + cmd("template"). + action { (_, c) => + c.copy(commands = c.commands :+ "template") + } children( + cmd("get"). + action { (_, c) => + c.copy(commands = c.commands :+ "get") + } children( + arg[String]("<template ID>") required() action { (x, c) => + c.copy(template = c.template.copy(repository = x)) + }, + arg[String]("<new engine directory>") action { (x, c) => + c.copy(template = c.template.copy(directory = x)) + }, + opt[String]("version") action { (x, c) => + c.copy(template = c.template.copy(version = Some(x))) + }, + opt[String]("name") action { (x, c) => + c.copy(template = c.template.copy(name = Some(x))) + }, + opt[String]("package") action { (x, c) => + c.copy(template = c.template.copy(packageName = Some(x))) + }, + opt[String]("email") action { (x, c) => + c.copy(template = c.template.copy(email = Some(x))) + } + ), + cmd("list"). + action { (_, c) => + c.copy(commands = c.commands :+ "list") + } + ) + cmd("export"). + action { (_, c) => + c.copy(commands = c.commands :+ "export") + } children( + opt[Int]("appid") required() action { (x, c) => + c.copy(export = c.export.copy(appId = x)) + }, + opt[String]("output") required() action { (x, c) => + c.copy(export = c.export.copy(outputPath = x)) + }, + opt[String]("format") action { (x, c) => + c.copy(export = c.export.copy(format = x)) + }, + opt[String]("channel") action { (x, c) => + c.copy(export = c.export.copy(channel = Some(x))) + } + ) + cmd("import"). + action { (_, c) => + c.copy(commands = c.commands :+ "import") + } children( + opt[Int]("appid") required() action { (x, c) => + c.copy(imprt = c.imprt.copy(appId = x)) + }, + opt[String]("input") required() action { (x, c) => + c.copy(imprt = c.imprt.copy(inputPath = x)) + }, + opt[String]("channel") action { (x, c) => + c.copy(imprt = c.imprt.copy(channel = Some(x))) + } + ) + } + + val separatorIndex = args.indexWhere(_ == "--") + val (consoleArgs, theRest) = + if (separatorIndex == -1) { + (args, Array[String]()) + } else { + args.splitAt(separatorIndex) + } + val allPassThroughArgs = theRest.drop(1) + val secondSepIdx = allPassThroughArgs.indexWhere(_ == "--") + val (sparkPassThroughArgs, driverPassThroughArgs) = + if (secondSepIdx == -1) { + (allPassThroughArgs, Array[String]()) + } else { + val t = allPassThroughArgs.splitAt(secondSepIdx) + (t._1, t._2.drop(1)) + } + + parser.parse(consoleArgs, ConsoleArgs()) map { pca => + val ca = pca.copy(common = pca.common.copy( + sparkPassThrough = sparkPassThroughArgs, + driverPassThrough = driverPassThroughArgs)) + WorkflowUtils.modifyLogging(ca.common.verbose) + val rv: Int = ca.commands match { + case Seq("") => + System.err.println(help()) + 1 + case Seq("version") => + version(ca) + 0 + case Seq("build") => + regenerateManifestJson(ca.common.manifestJson) + build(ca) + case Seq("unregister") => + unregister(ca) + 0 + case Seq("train") => + regenerateManifestJson(ca.common.manifestJson) + train(ca) + case Seq("eval") => + regenerateManifestJson(ca.common.manifestJson) + train(ca) + case Seq("deploy") => + deploy(ca) + case Seq("undeploy") => + undeploy(ca) + case Seq("dashboard") => + dashboard(ca) + 0 + case Seq("eventserver") => + eventserver(ca) + 0 + case Seq("adminserver") => + adminserver(ca) + 0 + case Seq("run") => + generateManifestJson(ca.common.manifestJson) + run(ca) + case Seq("status") => + status(ca) + case Seq("upgrade") => + upgrade(ca) + 0 + case Seq("app", "new") => + App.create(ca) + case Seq("app", "list") => + App.list(ca) + case Seq("app", "show") => + App.show(ca) + case Seq("app", "delete") => + App.delete(ca) + case Seq("app", "data-delete") => + App.dataDelete(ca) + case Seq("app", "channel-new") => + App.channelNew(ca) + case Seq("app", "channel-delete") => + App.channelDelete(ca) + case Seq("accesskey", "new") => + AccessKey.create(ca) + case Seq("accesskey", "list") => + AccessKey.list(ca) + case Seq("accesskey", "delete") => + AccessKey.delete(ca) + case Seq("template", "get") => + Template.get(ca) + case Seq("template", "list") => + Template.list(ca) + case Seq("export") => + Export.eventsToFile(ca) + case Seq("import") => + Import.fileToEvents(ca) + case _ => + System.err.println(help(ca.commands)) + 1 + } + sys.exit(rv) + } getOrElse { + val command = args.toSeq.filterNot(_.startsWith("--")).head + System.err.println(help(Seq(command))) + sys.exit(1) + } + } + + def help(commands: Seq[String] = Seq()): String = { + if (commands.isEmpty) { + mainHelp + } else { + val stripped = + (if (commands.head == "help") commands.drop(1) else commands). + mkString("-") + helpText.getOrElse(stripped, s"Help is unavailable for ${stripped}.") + } + } + + val mainHelp = txt.main().toString + + val helpText = Map( + "" -> mainHelp, + "status" -> txt.status().toString, + "upgrade" -> txt.upgrade().toString, + "version" -> txt.version().toString, + "template" -> txt.template().toString, + "build" -> txt.build().toString, + "train" -> txt.train().toString, + "deploy" -> txt.deploy().toString, + "eventserver" -> txt.eventserver().toString, + "adminserver" -> txt.adminserver().toString, + "app" -> txt.app().toString, + "accesskey" -> txt.accesskey().toString, + "import" -> txt.imprt().toString, + "export" -> txt.export().toString, + "run" -> txt.run().toString, + "eval" -> txt.eval().toString, + "dashboard" -> txt.dashboard().toString) + + def version(ca: ConsoleArgs): Unit = println(BuildInfo.version) + + def build(ca: ConsoleArgs): Int = { + Template.verifyTemplateMinVersion(new File("template.json")) + compile(ca) + info("Looking for an engine...") + val jarFiles = jarFilesForScala + if (jarFiles.isEmpty) { + error("No engine found. Your build might have failed. Aborting.") + return 1 + } + jarFiles foreach { f => info(s"Found ${f.getName}")} + RegisterEngine.registerEngine( + ca.common.manifestJson, + jarFiles, + false) + info("Your engine is ready for training.") + 0 + } + + def unregister(ca: ConsoleArgs): Unit = { + RegisterEngine.unregisterEngine(ca.common.manifestJson) + } + + def train(ca: ConsoleArgs): Int = { + Template.verifyTemplateMinVersion(new File("template.json")) + withRegisteredManifest( + ca.common.manifestJson, + ca.common.engineId, + ca.common.engineVersion) { em => + RunWorkflow.newRunWorkflow(ca, em) + } + } + + def deploy(ca: ConsoleArgs): Int = { + Template.verifyTemplateMinVersion(new File("template.json")) + withRegisteredManifest( + ca.common.manifestJson, + ca.common.engineId, + ca.common.engineVersion) { em => + val variantJson = parse(Source.fromFile(ca.common.variantJson).mkString) + val variantId = variantJson \ "id" match { + case JString(s) => s + case _ => + error("Unable to read engine variant ID from " + + s"${ca.common.variantJson.getCanonicalPath}. Aborting.") + return 1 + } + val engineInstances = storage.Storage.getMetaDataEngineInstances + val engineInstance = ca.engineInstanceId map { eid => + engineInstances.get(eid) + } getOrElse { + engineInstances.getLatestCompleted(em.id, em.version, variantId) + } + engineInstance map { r => + RunServer.newRunServer(ca, em, r.id) + } getOrElse { + ca.engineInstanceId map { eid => + error( + s"Invalid engine instance ID ${ca.engineInstanceId}. Aborting.") + } getOrElse { + error( + s"No valid engine instance found for engine ${em.id} " + + s"${em.version}.\nTry running 'train' before 'deploy'. Aborting.") + } + 1 + } + } + } + + def dashboard(ca: ConsoleArgs): Unit = { + info(s"Creating dashboard at ${ca.dashboard.ip}:${ca.dashboard.port}") + Dashboard.createDashboard(DashboardConfig( + ip = ca.dashboard.ip, + port = ca.dashboard.port)) + } + + def eventserver(ca: ConsoleArgs): Unit = { + info( + s"Creating Event Server at ${ca.eventServer.ip}:${ca.eventServer.port}") + EventServer.createEventServer(EventServerConfig( + ip = ca.eventServer.ip, + port = ca.eventServer.port, + stats = ca.eventServer.stats)) + } + + def adminserver(ca: ConsoleArgs): Unit = { + info( + s"Creating Admin Server at ${ca.adminServer.ip}:${ca.adminServer.port}") + AdminServer.createAdminServer(AdminServerConfig( + ip = ca.adminServer.ip, + port = ca.adminServer.port + )) + } + + def undeploy(ca: ConsoleArgs): Int = { + val serverUrl = s"http://${ca.deploy.ip}:${ca.deploy.port}" + info( + s"Undeploying any existing engine instance at ${serverUrl}") + try { + val code = Http(s"${serverUrl}/stop").asString.code + code match { + case 200 => 0 + case 404 => + error(s"Another process is using ${serverUrl}. Unable to undeploy.") + 1 + case _ => + error(s"Another process is using ${serverUrl}, or an existing " + + s"engine server is not responding properly (HTTP ${code}). " + + "Unable to undeploy.") + 1 + } + } catch { + case e: java.net.ConnectException => + warn(s"Nothing at ${serverUrl}") + 0 + case _: Throwable => + error("Another process might be occupying " + + s"${ca.deploy.ip}:${ca.deploy.port}. Unable to undeploy.") + 1 + } + } + + def compile(ca: ConsoleArgs): Unit = { + // only add pioVersion to sbt if project/pio.sbt exists + if (new File("project", "pio-build.sbt").exists || ca.build.forceGeneratePIOSbt) { + FileUtils.writeLines( + new File("pio.sbt"), + Seq( + "// Generated automatically by pio build.", + "// Changes in this file will be overridden.", + "", + "pioVersion := \"" + BuildInfo.version + "\"")) + } + implicit val formats = Utils.json4sDefaultFormats + try { + val engineFactory = + (parse(Source.fromFile("engine.json").mkString) \ "engineFactory"). + extract[String] + WorkflowUtils.checkUpgrade("build", engineFactory) + } catch { + case e: Throwable => WorkflowUtils.checkUpgrade("build") + } + val sbt = detectSbt(ca) + info(s"Using command '${sbt}' at the current working directory to build.") + info("If the path above is incorrect, this process will fail.") + val asm = + if (ca.build.sbtAssemblyPackageDependency) { + " assemblyPackageDependency" + } else { + "" + } + val clean = if (ca.build.sbtClean) " clean" else "" + val buildCmd = s"${sbt} ${ca.build.sbtExtra.getOrElse("")}${clean} " + + (if (ca.build.uberJar) "assembly" else s"package${asm}") + val core = new File(s"pio-assembly-${BuildInfo.version}.jar") + if (ca.build.uberJar) { + info(s"Uber JAR enabled. Putting ${core.getName} in lib.") + val dst = new File("lib") + dst.mkdir() + FileUtils.copyFileToDirectory( + coreAssembly(ca.common.pioHome.get), + dst, + true) + } else { + if (new File("engine.json").exists()) { + info(s"Uber JAR disabled. Making sure lib/${core.getName} is absent.") + new File("lib", core.getName).delete() + } else { + info("Uber JAR disabled, but current working directory does not look " + + s"like an engine project directory. Please delete lib/${core.getName} manually.") + } + } + info(s"Going to run: ${buildCmd}") + try { + val r = + if (ca.common.verbose) { + buildCmd.!(ProcessLogger(line => info(line), line => error(line))) + } else { + buildCmd.!(ProcessLogger( + line => outputSbtError(line), + line => outputSbtError(line))) + } + if (r != 0) { + error(s"Return code of previous step is ${r}. Aborting.") + sys.exit(1) + } + info("Build finished successfully.") + } catch { + case e: java.io.IOException => + error(s"${e.getMessage}") + sys.exit(1) + } + } + + private def outputSbtError(line: String): Unit = { + """\[.*error.*\]""".r findFirstIn line foreach { _ => error(line) } + } + + def run(ca: ConsoleArgs): Int = { + compile(ca) + + val extraFiles = WorkflowUtils.thirdPartyConfFiles + + val jarFiles = jarFilesForScala + jarFiles foreach { f => info(s"Found JAR: ${f.getName}") } + val allJarFiles = jarFiles.map(_.getCanonicalPath) + val cmd = s"${getSparkHome(ca.common.sparkHome)}/bin/spark-submit --jars " + + s"${allJarFiles.mkString(",")} " + + (if (extraFiles.size > 0) { + s"--files ${extraFiles.mkString(",")} " + } else { + "" + }) + + "--class " + + s"${ca.mainClass.get} ${ca.common.sparkPassThrough.mkString(" ")} " + + coreAssembly(ca.common.pioHome.get) + " " + + ca.common.driverPassThrough.mkString(" ") + val proc = Process( + cmd, + None, + "SPARK_YARN_USER_ENV" -> sys.env.filter(kv => kv._1.startsWith("PIO_")). + map(kv => s"${kv._1}=${kv._2}").mkString(",")) + info(s"Submission command: ${cmd}") + val r = proc.! + if (r != 0) { + error(s"Return code of previous step is ${r}. Aborting.") + return 1 + } + r + } + + def status(ca: ConsoleArgs): Int = { + info("Inspecting PredictionIO...") + ca.common.pioHome map { pioHome => + info(s"PredictionIO ${BuildInfo.version} is installed at $pioHome") + } getOrElse { + error("Unable to locate PredictionIO installation. Aborting.") + return 1 + } + info("Inspecting Apache Spark...") + val sparkHome = getSparkHome(ca.common.sparkHome) + if (new File(s"$sparkHome/bin/spark-submit").exists) { + info(s"Apache Spark is installed at $sparkHome") + val sparkMinVersion = "1.3.0" + val sparkReleaseFile = new File(s"$sparkHome/RELEASE") + if (sparkReleaseFile.exists) { + val sparkReleaseStrings = + Source.fromFile(sparkReleaseFile).mkString.split(' ') + if (sparkReleaseStrings.length < 2) { + warn(stripMarginAndNewlines( + s"""|Apache Spark version information cannot be found (RELEASE file + |is empty). This is a known issue for certain vendors (e.g. + |Cloudera). Please make sure you are using a version of at least + |$sparkMinVersion.""")) + } else { + val sparkReleaseVersion = sparkReleaseStrings(1) + val parsedMinVersion = Version.apply(sparkMinVersion) + val parsedCurrentVersion = Version.apply(sparkReleaseVersion) + if (parsedCurrentVersion >= parsedMinVersion) { + info(stripMarginAndNewlines( + s"""|Apache Spark $sparkReleaseVersion detected (meets minimum + |requirement of $sparkMinVersion)""")) + } else { + error(stripMarginAndNewlines( + s"""|Apache Spark $sparkReleaseVersion detected (does not meet + |minimum requirement. Aborting.""")) + } + } + } else { + warn(stripMarginAndNewlines( + s"""|Apache Spark version information cannot be found. If you are + |using a developmental tree, please make sure you are using a + |version of at least $sparkMinVersion.""")) + } + } else { + error("Unable to locate a proper Apache Spark installation. Aborting.") + return 1 + } + info("Inspecting storage backend connections...") + try { + storage.Storage.verifyAllDataObjects() + } catch { + case e: Throwable => + error("Unable to connect to all storage backends successfully. The " + + "following shows the error message from the storage backend.") + error(s"${e.getMessage} (${e.getClass.getName})", e) + error("Dumping configuration of initialized storage backend sources. " + + "Please make sure they are correct.") + storage.Storage.config.get("sources") map { src => + src foreach { case (s, p) => + error(s"Source Name: $s; Type: ${p.getOrElse("type", "(error)")}; " + + s"Configuration: ${p.getOrElse("config", "(error)")}") + } + } getOrElse { + error("No properly configured storage backend sources.") + } + return 1 + } + info("(sleeping 5 seconds for all messages to show up...)") + Thread.sleep(5000) + info("Your system is all ready to go.") + 0 + } + + def upgrade(ca: ConsoleArgs): Unit = { + (ca.upgrade.from, ca.upgrade.to) match { + case ("0.8.2", "0.8.3") => { + Upgrade_0_8_3.runMain(ca.upgrade.oldAppId, ca.upgrade.newAppId) + } + case _ => + println(s"Upgrade from version ${ca.upgrade.from} to ${ca.upgrade.to}" + + s" is not supported.") + } + } + + def coreAssembly(pioHome: String): File = { + val core = s"pio-assembly-${BuildInfo.version}.jar" + val coreDir = + if (new File(pioHome + File.separator + "RELEASE").exists) { + new File(pioHome + File.separator + "lib") + } else { + new File(pioHome + File.separator + "assembly") + } + val coreFile = new File(coreDir, core) + if (coreFile.exists) { + coreFile + } else { + error(s"PredictionIO Core Assembly (${coreFile.getCanonicalPath}) does " + + "not exist. Aborting.") + sys.exit(1) + } + } + + val manifestAutogenTag = "pio-autogen-manifest" + + def regenerateManifestJson(json: File): Unit = { + val cwd = sys.props("user.dir") + val ha = java.security.MessageDigest.getInstance("SHA-1"). + digest(cwd.getBytes).map("%02x".format(_)).mkString + if (json.exists) { + val em = readManifestJson(json) + if (em.description == Some(manifestAutogenTag) && ha != em.version) { + warn("This engine project directory contains an auto-generated " + + "manifest that has been copied/moved from another location. ") + warn("Regenerating the manifest to reflect the updated location. " + + "This will dissociate with all previous engine instances.") + generateManifestJson(json) + } else { + info(s"Using existing engine manifest JSON at ${json.getCanonicalPath}") + } + } else { + generateManifestJson(json) + } + } + + def generateManifestJson(json: File): Unit = { + val cwd = sys.props("user.dir") + implicit val formats = Utils.json4sDefaultFormats + + new EngineManifestSerializer + val rand = Random.alphanumeric.take(32).mkString + val ha = java.security.MessageDigest.getInstance("SHA-1"). + digest(cwd.getBytes).map("%02x".format(_)).mkString + val em = EngineManifest( + id = rand, + version = ha, + name = new File(cwd).getName, + description = Some(manifestAutogenTag), + files = Seq(), + engineFactory = "") + try { + FileUtils.writeStringToFile(json, write(em), "ISO-8859-1") + } catch { + case e: java.io.IOException => + error(s"Cannot generate ${json} automatically (${e.getMessage}). " + + "Aborting.") + sys.exit(1) + } + } + + def readManifestJson(json: File): EngineManifest = { + implicit val formats = Utils.json4sDefaultFormats + + new EngineManifestSerializer + try { + read[EngineManifest](Source.fromFile(json).mkString) + } catch { + case e: java.io.FileNotFoundException => + error(s"${json.getCanonicalPath} does not exist. Aborting.") + sys.exit(1) + case e: MappingException => + error(s"${json.getCanonicalPath} has invalid content: " + + e.getMessage) + sys.exit(1) + } + } + + def withRegisteredManifest( + json: File, + engineId: Option[String], + engineVersion: Option[String])( + op: EngineManifest => Int): Int = { + val ej = readManifestJson(json) + val id = engineId getOrElse ej.id + val version = engineVersion getOrElse ej.version + storage.Storage.getMetaDataEngineManifests.get(id, version) map { + op + } getOrElse { + error(s"Engine ${id} ${version} cannot be found in the system.") + error("Possible reasons:") + error("- the engine is not yet built by the 'build' command;") + error("- the meta data store is offline.") + 1 + } + } + + def jarFilesAt(path: File): Array[File] = recursiveListFiles(path) filter { + _.getName.toLowerCase.endsWith(".jar") + } + + def jarFilesForScala: Array[File] = { + val libFiles = jarFilesForScalaFilter(jarFilesAt(new File("lib"))) + val targetFiles = jarFilesForScalaFilter(jarFilesAt(new File("target" + + File.separator + s"scala-${scalaVersionNoPatch}"))) + // Use libFiles is target is empty. + if (targetFiles.size > 0) targetFiles else libFiles + } + + def jarFilesForScalaFilter(jars: Array[File]): Array[File] = + jars.filterNot { f => + f.getName.toLowerCase.endsWith("-javadoc.jar") || + f.getName.toLowerCase.endsWith("-sources.jar") + } + + def recursiveListFiles(f: File): Array[File] = { + Option(f.listFiles) map { these => + these ++ these.filter(_.isDirectory).flatMap(recursiveListFiles) + } getOrElse Array[File]() + } + + def getSparkHome(sparkHome: Option[String]): String = { + sparkHome getOrElse { + sys.env.getOrElse("SPARK_HOME", ".") + } + } + + def versionNoPatch(fullVersion: String): String = { + val v = """^(\d+\.\d+)""".r + val versionNoPatch = for { + v(np) <- v findFirstIn fullVersion + } yield np + versionNoPatch.getOrElse(fullVersion) + } + + def scalaVersionNoPatch: String = versionNoPatch(BuildInfo.scalaVersion) + + def detectSbt(ca: ConsoleArgs): String = { + ca.build.sbt map { + _.getCanonicalPath + } getOrElse { + val f = new File(Seq(ca.common.pioHome.get, "sbt", "sbt").mkString( + File.separator)) + if (f.exists) f.getCanonicalPath else "sbt" + } + } + + def stripMarginAndNewlines(string: String): String = + string.stripMargin.replaceAll("\n", " ") +}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/console/Export.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Export.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Export.scala new file mode 100644 index 0000000..7c0dfa4 --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Export.scala @@ -0,0 +1,42 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools.console + +import org.apache.predictionio.tools.Runner + +case class ExportArgs( + appId: Int = 0, + channel: Option[String] = None, + outputPath: String = "", + format: String = "json") + +object Export { + def eventsToFile(ca: ConsoleArgs): Int = { + val channelArg = ca.export.channel + .map(ch => Seq("--channel", ch)).getOrElse(Nil) + Runner.runOnSpark( + "org.apache.predictionio.tools.export.EventsToFile", + Seq( + "--appid", + ca.export.appId.toString, + "--output", + ca.export.outputPath, + "--format", + ca.export.format) ++ channelArg, + ca, + Nil) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/console/Import.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Import.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Import.scala new file mode 100644 index 0000000..185aefb --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Import.scala @@ -0,0 +1,39 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools.console + +import org.apache.predictionio.tools.Runner + +case class ImportArgs( + appId: Int = 0, + channel: Option[String] = None, + inputPath: String = "") + +object Import { + def fileToEvents(ca: ConsoleArgs): Int = { + val channelArg = ca.imprt.channel + .map(ch => Seq("--channel", ch)).getOrElse(Nil) + Runner.runOnSpark( + "org.apache.predictionio.tools.imprt.FileToEvents", + Seq( + "--appid", + ca.imprt.appId.toString, + "--input", + ca.imprt.inputPath) ++ channelArg, + ca, + Nil) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/console/Template.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Template.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Template.scala new file mode 100644 index 0000000..f47cacf --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Template.scala @@ -0,0 +1,429 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools.console + +import java.io.BufferedInputStream +import java.io.BufferedOutputStream +import java.io.File +import java.io.FileInputStream +import java.io.FileOutputStream +import java.net.ConnectException +import java.net.URI +import java.util.zip.ZipInputStream + +import grizzled.slf4j.Logging +import org.apache.predictionio.controller.Utils +import org.apache.predictionio.core.BuildInfo +import org.apache.commons.io.FileUtils +import org.json4s._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write +import semverfi._ + +import scala.io.Source +import scala.sys.process._ +import scalaj.http._ + +case class TemplateArgs( + directory: String = "", + repository: String = "", + version: Option[String] = None, + name: Option[String] = None, + packageName: Option[String] = None, + email: Option[String] = None) + +case class GitHubTag( + name: String, + zipball_url: String, + tarball_url: String, + commit: GitHubCommit) + +case class GitHubCommit( + sha: String, + url: String) + +case class GitHubCache( + headers: Map[String, String], + body: String) + +case class TemplateEntry( + repo: String) + +case class TemplateMetaData( + pioVersionMin: Option[String] = None) + +object Template extends Logging { + implicit val formats = Utils.json4sDefaultFormats + + def templateMetaData(templateJson: File): TemplateMetaData = { + if (!templateJson.exists) { + warn(s"$templateJson does not exist. Template metadata will not be available. " + + "(This is safe to ignore if you are not working on a template.)") + TemplateMetaData() + } else { + val jsonString = Source.fromFile(templateJson)(scala.io.Codec.ISO8859).mkString + val json = try { + parse(jsonString) + } catch { + case e: org.json4s.ParserUtil.ParseException => + warn(s"$templateJson cannot be parsed. Template metadata will not be available.") + return TemplateMetaData() + } + val pioVersionMin = json \ "pio" \ "version" \ "min" + pioVersionMin match { + case JString(s) => TemplateMetaData(pioVersionMin = Some(s)) + case _ => TemplateMetaData() + } + } + } + + /** Creates a wrapper that provides the functionality of scalaj.http.Http() + * with automatic proxy settings handling. The proxy settings will first + * come from "git" followed by system properties "http.proxyHost" and + * "http.proxyPort". + * + * @param url URL to be connected + * @return + */ + def httpOptionalProxy(url: String): HttpRequest = { + val gitProxy = try { + Some(Process("git config --global http.proxy").lines.toList(0)) + } catch { + case e: Throwable => None + } + + val (host, port) = gitProxy map { p => + val proxyUri = new URI(p) + (Option(proxyUri.getHost), + if (proxyUri.getPort == -1) None else Some(proxyUri.getPort)) + } getOrElse { + (sys.props.get("http.proxyHost"), + sys.props.get("http.proxyPort").map { p => + try { + Some(p.toInt) + } catch { + case e: NumberFormatException => None + } + } getOrElse None) + } + + (host, port) match { + case (Some(h), Some(p)) => Http(url).proxy(h, p) + case _ => Http(url) + } + } + + def getGitHubRepos( + repos: Seq[String], + apiType: String, + repoFilename: String): Map[String, GitHubCache] = { + val reposCache = try { + val cache = + Source.fromFile(repoFilename)(scala.io.Codec.ISO8859).mkString + read[Map[String, GitHubCache]](cache) + } catch { + case e: Throwable => Map[String, GitHubCache]() + } + val newReposCache = reposCache ++ (try { + repos.map { repo => + val url = s"https://api.github.com/repos/$repo/$apiType" + val http = httpOptionalProxy(url) + val response = reposCache.get(repo).map { cache => + cache.headers.get("ETag").map { etag => + http.header("If-None-Match", etag).asString + } getOrElse { + http.asString + } + } getOrElse { + http.asString + } + + val body = if (response.code == 304) { + reposCache(repo).body + } else { + response.body + } + + repo -> GitHubCache(headers = response.headers, body = body) + }.toMap + } catch { + case e: ConnectException => + githubConnectErrorMessage(e) + Map() + }) + FileUtils.writeStringToFile( + new File(repoFilename), + write(newReposCache), + "ISO-8859-1") + newReposCache + } + + def sub(repo: String, name: String, email: String, org: String): Unit = { + val data = Map( + "repo" -> repo, + "name" -> name, + "email" -> email, + "org" -> org) + try { + httpOptionalProxy("https://update.prediction.io/templates.subscribe"). + postData("json=" + write(data)).asString + } catch { + case e: Throwable => error("Unable to subscribe.") + } + } + + def meta(repo: String, name: String, org: String): Unit = { + try { + httpOptionalProxy( + s"https://meta.prediction.io/templates/$repo/$org/$name").asString + } catch { + case e: Throwable => debug("Template metadata unavailable.") + } + } + + def list(ca: ConsoleArgs): Int = { + val templatesUrl = "https://templates.prediction.io/index.json" + try { + val templatesJson = Source.fromURL(templatesUrl).mkString("") + val templates = read[List[TemplateEntry]](templatesJson) + println("The following is a list of template IDs registered on " + + "PredictionIO Template Gallery:") + println() + templates.sortBy(_.repo.toLowerCase).foreach { template => + println(template.repo) + } + println() + println("Notice that it is possible use any GitHub repository as your " + + "engine template ID (e.g. YourOrg/YourTemplate).") + 0 + } catch { + case e: Throwable => + error(s"Unable to list templates from $templatesUrl " + + s"(${e.getMessage}). Aborting.") + 1 + } + } + + def githubConnectErrorMessage(e: ConnectException): Unit = { + error(s"Unable to connect to GitHub (Reason: ${e.getMessage}). " + + "Please check your network configuration and proxy settings.") + } + + def get(ca: ConsoleArgs): Int = { + val repos = + getGitHubRepos(Seq(ca.template.repository), "tags", ".templates-cache") + + repos.get(ca.template.repository).map { repo => + try { + read[List[GitHubTag]](repo.body) + } catch { + case e: MappingException => + error(s"Either ${ca.template.repository} is not a valid GitHub " + + "repository, or it does not have any tag. Aborting.") + return 1 + } + } getOrElse { + error(s"Failed to retrieve ${ca.template.repository}. Aborting.") + return 1 + } + + val name = ca.template.name getOrElse { + try { + Process("git config --global user.name").lines.toList(0) + } catch { + case e: Throwable => + readLine("Please enter author's name: ") + } + } + + val organization = ca.template.packageName getOrElse { + readLine( + "Please enter the template's Scala package name (e.g. com.mycompany): ") + } + + val email = ca.template.email getOrElse { + try { + Process("git config --global user.email").lines.toList(0) + } catch { + case e: Throwable => + readLine("Please enter author's e-mail address: ") + } + } + + println(s"Author's name: $name") + println(s"Author's e-mail: $email") + println(s"Author's organization: $organization") + + var subscribe = readLine("Would you like to be informed about new bug " + + "fixes and security updates of this template? (Y/n) ") + var valid = false + + do { + subscribe match { + case "" | "Y" | "y" => + sub(ca.template.repository, name, email, organization) + valid = true + case "n" | "N" => + meta(ca.template.repository, name, organization) + valid = true + case _ => + println("Please answer 'y' or 'n'") + subscribe = readLine("(Y/n)? ") + } + } while (!valid) + + val repo = repos(ca.template.repository) + + println(s"Retrieving ${ca.template.repository}") + val tags = read[List[GitHubTag]](repo.body) + println(s"There are ${tags.size} tags") + + if (tags.size == 0) { + println(s"${ca.template.repository} does not have any tag. Aborting.") + return 1 + } + + val tag = ca.template.version.map { v => + tags.find(_.name == v).getOrElse { + println(s"${ca.template.repository} does not have tag $v. Aborting.") + return 1 + } + } getOrElse tags.head + + println(s"Using tag ${tag.name}") + val url = + s"https://github.com/${ca.template.repository}/archive/${tag.name}.zip" + println(s"Going to download $url") + val trial = try { + httpOptionalProxy(url).asBytes + } catch { + case e: ConnectException => + githubConnectErrorMessage(e) + return 1 + } + val finalTrial = try { + trial.location.map { loc => + println(s"Redirecting to $loc") + httpOptionalProxy(loc).asBytes + } getOrElse trial + } catch { + case e: ConnectException => + githubConnectErrorMessage(e) + return 1 + } + val zipFilename = + s"${ca.template.repository.replace('/', '-')}-${tag.name}.zip" + FileUtils.writeByteArrayToFile( + new File(zipFilename), + finalTrial.body) + val zis = new ZipInputStream( + new BufferedInputStream(new FileInputStream(zipFilename))) + val bufferSize = 4096 + val filesToModify = collection.mutable.ListBuffer[String]() + var ze = zis.getNextEntry + while (ze != null) { + val filenameSegments = ze.getName.split(File.separatorChar) + val destFilename = (ca.template.directory +: filenameSegments.tail). + mkString(File.separator) + if (ze.isDirectory) { + new File(destFilename).mkdirs + } else { + val os = new BufferedOutputStream( + new FileOutputStream(destFilename), + bufferSize) + val data = Array.ofDim[Byte](bufferSize) + var count = zis.read(data, 0, bufferSize) + while (count != -1) { + os.write(data, 0, count) + count = zis.read(data, 0, bufferSize) + } + os.flush() + os.close() + + val nameOnly = new File(destFilename).getName + + if (organization != "" && + (nameOnly.endsWith(".scala") || + nameOnly == "build.sbt" || + nameOnly == "engine.json")) { + filesToModify += destFilename + } + } + ze = zis.getNextEntry + } + zis.close() + new File(zipFilename).delete + + val engineJsonFile = + new File(ca.template.directory, "engine.json") + + val engineJson = try { + Some(parse(Source.fromFile(engineJsonFile).mkString)) + } catch { + case e: java.io.IOException => + error("Unable to read engine.json. Skipping automatic package " + + "name replacement.") + None + case e: MappingException => + error("Unable to parse engine.json. Skipping automatic package " + + "name replacement.") + None + } + + val engineFactory = engineJson.map { ej => + (ej \ "engineFactory").extractOpt[String] + } getOrElse None + + engineFactory.map { ef => + val pkgName = ef.split('.').dropRight(1).mkString(".") + println(s"Replacing $pkgName with $organization...") + + filesToModify.foreach { ftm => + println(s"Processing $ftm...") + val fileContent = Source.fromFile(ftm).getLines() + val processedLines = + fileContent.map(_.replaceAllLiterally(pkgName, organization)) + FileUtils.writeStringToFile( + new File(ftm), + processedLines.mkString("\n")) + } + } getOrElse { + error("engineFactory is not found in engine.json. Skipping automatic " + + "package name replacement.") + } + + verifyTemplateMinVersion(new File(ca.template.directory, "template.json")) + + println(s"Engine template ${ca.template.repository} is now ready at " + + ca.template.directory) + + 0 + } + + def verifyTemplateMinVersion(templateJsonFile: File): Unit = { + val metadata = templateMetaData(templateJsonFile) + + metadata.pioVersionMin.foreach { pvm => + if (Version(BuildInfo.version) < Version(pvm)) { + error(s"This engine template requires at least PredictionIO $pvm. " + + s"The template may not work with PredictionIO ${BuildInfo.version}.") + sys.exit(1) + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/dashboard/CorsSupport.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/dashboard/CorsSupport.scala b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/CorsSupport.scala new file mode 100644 index 0000000..aaafd8a --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/CorsSupport.scala @@ -0,0 +1,75 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools.dashboard + +// Reference from: https://gist.github.com/waymost/4b5598523c2c7361abea + +import spray.http.{HttpMethods, HttpMethod, HttpResponse, AllOrigins} +import spray.http.HttpHeaders._ +import spray.http.HttpMethods._ +import spray.http.HttpEntity +import spray.routing._ +import spray.http.StatusCodes +import spray.http.ContentTypes + +// see also https://developer.mozilla.org/en-US/docs/Web/HTTP/Access_control_CORS +trait CORSSupport { + this: HttpService => + + private val allowOriginHeader = `Access-Control-Allow-Origin`(AllOrigins) + private val optionsCorsHeaders = List( + `Access-Control-Allow-Headers`("""Origin, + |X-Requested-With, + |Content-Type, + |Accept, + |Accept-Encoding, + |Accept-Language, + |Host, + |Referer, + |User-Agent""".stripMargin.replace("\n", " ")), + `Access-Control-Max-Age`(1728000) + ) + + def cors[T]: Directive0 = mapRequestContext { ctx => + ctx.withRouteResponseHandling { + // OPTION request for a resource that responds to other methods + case Rejected(x) if (ctx.request.method.equals(HttpMethods.OPTIONS) && + x.exists(_.isInstanceOf[MethodRejection])) => { + val allowedMethods: List[HttpMethod] = x.collect { + case rejection: MethodRejection => rejection.supported + } + ctx.complete { + HttpResponse().withHeaders( + `Access-Control-Allow-Methods`(HttpMethods.OPTIONS, allowedMethods :_*) :: + allowOriginHeader :: + optionsCorsHeaders + ) + } + } + }.withHttpResponseHeadersMapped { headers => + allowOriginHeader :: headers + } + } + + override def timeoutRoute: StandardRoute = complete { + HttpResponse( + StatusCodes.InternalServerError, + HttpEntity(ContentTypes.`text/plain(UTF-8)`, + "The server was not able to produce a timely response to your request."), + List(allowOriginHeader) + ) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala new file mode 100644 index 0000000..bfd7c64 --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala @@ -0,0 +1,156 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools.dashboard + +import com.typesafe.config.ConfigFactory +import org.apache.predictionio.authentication.KeyAuthentication +import org.apache.predictionio.configuration.SSLConfiguration +import org.apache.predictionio.data.storage.Storage +import spray.can.server.ServerSettings +import spray.routing.directives.AuthMagnet +import scala.concurrent.{Future, ExecutionContext} +import akka.actor.{ActorContext, Actor, ActorSystem, Props} +import akka.io.IO +import akka.pattern.ask +import akka.util.Timeout +import com.github.nscala_time.time.Imports.DateTime +import grizzled.slf4j.Logging +import spray.can.Http +import spray.http._ +import spray.http.MediaTypes._ +import spray.routing._ +import spray.routing.authentication.{Authentication, UserPass, BasicAuth} + +import scala.concurrent.duration._ + +case class DashboardConfig( + ip: String = "localhost", + port: Int = 9000) + +object Dashboard extends Logging with SSLConfiguration{ + def main(args: Array[String]): Unit = { + val parser = new scopt.OptionParser[DashboardConfig]("Dashboard") { + opt[String]("ip") action { (x, c) => + c.copy(ip = x) + } text("IP to bind to (default: localhost).") + opt[Int]("port") action { (x, c) => + c.copy(port = x) + } text("Port to bind to (default: 9000).") + } + + parser.parse(args, DashboardConfig()) map { dc => + createDashboard(dc) + } + } + + def createDashboard(dc: DashboardConfig): Unit = { + implicit val system = ActorSystem("pio-dashboard") + val service = + system.actorOf(Props(classOf[DashboardActor], dc), "dashboard") + implicit val timeout = Timeout(5.seconds) + val settings = ServerSettings(system) + IO(Http) ? Http.Bind( + service, + interface = dc.ip, + port = dc.port, + settings = Some(settings.copy(sslEncryption = true))) + system.awaitTermination + } +} + +class DashboardActor( + val dc: DashboardConfig) + extends Actor with DashboardService { + def actorRefFactory: ActorContext = context + def receive: Actor.Receive = runRoute(dashboardRoute) +} + +trait DashboardService extends HttpService with KeyAuthentication with CORSSupport { + + implicit def executionContext: ExecutionContext = actorRefFactory.dispatcher + val dc: DashboardConfig + val evaluationInstances = Storage.getMetaDataEvaluationInstances + val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_")) + val serverStartTime = DateTime.now + val dashboardRoute = + path("") { + authenticate(withAccessKeyFromFile) { request => + get { + respondWithMediaType(`text/html`) { + complete { + val completedInstances = evaluationInstances.getCompleted + html.index( + dc, + serverStartTime, + pioEnvVars, + completedInstances).toString + } + } + } + } + } ~ + pathPrefix("engine_instances" / Segment) { instanceId => + path("evaluator_results.txt") { + get { + respondWithMediaType(`text/plain`) { + evaluationInstances.get(instanceId).map { i => + complete(i.evaluatorResults) + } getOrElse { + complete(StatusCodes.NotFound) + } + } + } + } ~ + path("evaluator_results.html") { + get { + respondWithMediaType(`text/html`) { + evaluationInstances.get(instanceId).map { i => + complete(i.evaluatorResultsHTML) + } getOrElse { + complete(StatusCodes.NotFound) + } + } + } + } ~ + path("evaluator_results.json") { + get { + respondWithMediaType(`application/json`) { + evaluationInstances.get(instanceId).map { i => + complete(i.evaluatorResultsJSON) + } getOrElse { + complete(StatusCodes.NotFound) + } + } + } + } ~ + cors { + path("local_evaluator_results.json") { + get { + respondWithMediaType(`application/json`) { + evaluationInstances.get(instanceId).map { i => + complete(i.evaluatorResultsJSON) + } getOrElse { + complete(StatusCodes.NotFound) + } + } + } + } + } + } ~ + pathPrefix("assets") { + getFromResourceDirectory("assets") + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala b/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala new file mode 100644 index 0000000..feabce4 --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/export/EventsToFile.scala @@ -0,0 +1,104 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools.export + +import org.apache.predictionio.controller.Utils +import org.apache.predictionio.data.storage.EventJson4sSupport +import org.apache.predictionio.data.storage.Storage +import org.apache.predictionio.tools.Runner +import org.apache.predictionio.workflow.WorkflowContext +import org.apache.predictionio.workflow.WorkflowUtils + +import grizzled.slf4j.Logging +import org.apache.spark.sql.SQLContext +import org.json4s.native.Serialization._ + +case class EventsToFileArgs( + env: String = "", + logFile: String = "", + appId: Int = 0, + channel: Option[String] = None, + outputPath: String = "", + format: String = "parquet", + verbose: Boolean = false, + debug: Boolean = false) + +object EventsToFile extends Logging { + def main(args: Array[String]): Unit = { + val parser = new scopt.OptionParser[EventsToFileArgs]("EventsToFile") { + opt[String]("env") action { (x, c) => + c.copy(env = x) + } + opt[String]("log-file") action { (x, c) => + c.copy(logFile = x) + } + opt[Int]("appid") action { (x, c) => + c.copy(appId = x) + } + opt[String]("channel") action { (x, c) => + c.copy(channel = Some(x)) + } + opt[String]("format") action { (x, c) => + c.copy(format = x) + } + opt[String]("output") action { (x, c) => + c.copy(outputPath = x) + } + opt[Unit]("verbose") action { (x, c) => + c.copy(verbose = true) + } + opt[Unit]("debug") action { (x, c) => + c.copy(debug = true) + } + } + parser.parse(args, EventsToFileArgs()) map { args => + // get channelId + val channels = Storage.getMetaDataChannels + val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap + + val channelId: Option[Int] = args.channel.map { ch => + if (!channelMap.contains(ch)) { + error(s"Channel ${ch} doesn't exist in this app.") + sys.exit(1) + } + + channelMap(ch) + } + + val channelStr = args.channel.map(n => " Channel " + n).getOrElse("") + + WorkflowUtils.modifyLogging(verbose = args.verbose) + @transient lazy implicit val formats = Utils.json4sDefaultFormats + + new EventJson4sSupport.APISerializer + val sc = WorkflowContext( + mode = "Export", + batch = "App ID " + args.appId + channelStr, + executorEnv = Runner.envStringToMap(args.env)) + val sqlContext = new SQLContext(sc) + val events = Storage.getPEvents() + val eventsRdd = events.find(appId = args.appId, channelId = channelId)(sc) + val jsonStringRdd = eventsRdd.map(write(_)) + if (args.format == "json") { + jsonStringRdd.saveAsTextFile(args.outputPath) + } else { + val jsonRdd = sqlContext.jsonRDD(jsonStringRdd) + jsonRdd.saveAsParquetFile(args.outputPath) + } + info(s"Events are exported to ${args.outputPath}/.") + info("Done.") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala b/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala new file mode 100644 index 0000000..98a3344 --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/imprt/FileToEvents.scala @@ -0,0 +1,103 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools.imprt + +import org.apache.predictionio.controller.Utils +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.EventJson4sSupport +import org.apache.predictionio.data.storage.Storage +import org.apache.predictionio.tools.Runner +import org.apache.predictionio.workflow.WorkflowContext +import org.apache.predictionio.workflow.WorkflowUtils + +import grizzled.slf4j.Logging +import org.json4s.native.Serialization._ + +import scala.util.{Failure, Try} + +case class FileToEventsArgs( + env: String = "", + logFile: String = "", + appId: Int = 0, + channel: Option[String] = None, + inputPath: String = "", + verbose: Boolean = false, + debug: Boolean = false) + +object FileToEvents extends Logging { + def main(args: Array[String]): Unit = { + val parser = new scopt.OptionParser[FileToEventsArgs]("FileToEvents") { + opt[String]("env") action { (x, c) => + c.copy(env = x) + } + opt[String]("log-file") action { (x, c) => + c.copy(logFile = x) + } + opt[Int]("appid") action { (x, c) => + c.copy(appId = x) + } + opt[String]("channel") action { (x, c) => + c.copy(channel = Some(x)) + } + opt[String]("input") action { (x, c) => + c.copy(inputPath = x) + } + opt[Unit]("verbose") action { (x, c) => + c.copy(verbose = true) + } + opt[Unit]("debug") action { (x, c) => + c.copy(debug = true) + } + } + parser.parse(args, FileToEventsArgs()) map { args => + // get channelId + val channels = Storage.getMetaDataChannels + val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap + + val channelId: Option[Int] = args.channel.map { ch => + if (!channelMap.contains(ch)) { + error(s"Channel ${ch} doesn't exist in this app.") + sys.exit(1) + } + + channelMap(ch) + } + + val channelStr = args.channel.map(n => " Channel " + n).getOrElse("") + + WorkflowUtils.modifyLogging(verbose = args.verbose) + @transient lazy implicit val formats = Utils.json4sDefaultFormats + + new EventJson4sSupport.APISerializer + val sc = WorkflowContext( + mode = "Import", + batch = "App ID " + args.appId + channelStr, + executorEnv = Runner.envStringToMap(args.env)) + val rdd = sc.textFile(args.inputPath).filter(_.trim.nonEmpty).map { json => + Try(read[Event](json)).recoverWith { + case e: Throwable => + error(s"\nmalformed json => $json") + Failure(e) + }.get + } + val events = Storage.getPEvents() + events.write(events = rdd, + appId = args.appId, + channelId = channelId)(sc) + info("Events are imported.") + info("Done.") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/twirl/io/prediction/tools/console/accesskey.scala.txt ---------------------------------------------------------------------- diff --git a/tools/src/main/twirl/io/prediction/tools/console/accesskey.scala.txt b/tools/src/main/twirl/io/prediction/tools/console/accesskey.scala.txt deleted file mode 100644 index 651dbaf..0000000 --- a/tools/src/main/twirl/io/prediction/tools/console/accesskey.scala.txt +++ /dev/null @@ -1,20 +0,0 @@ -Usage: pio accesskey new [--key] <app name> [<event1> <event2>...] - -Add allowed event(s) to an access key. - - --key <value> - Specify a custom key. - <app name> - App to be associated with the new access key. - <event1> <event2>... - Allowed event name(s) to be added to the access key. - -Usage: pio accesskey list [<app name>] - - <app name> - App name. - -Usage: pio accesskey delete <access key> - - <access key> - The access key to be deleted. http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/twirl/io/prediction/tools/console/adminserver.scala.txt ---------------------------------------------------------------------- diff --git a/tools/src/main/twirl/io/prediction/tools/console/adminserver.scala.txt b/tools/src/main/twirl/io/prediction/tools/console/adminserver.scala.txt deleted file mode 100644 index 4ec0237..0000000 --- a/tools/src/main/twirl/io/prediction/tools/console/adminserver.scala.txt +++ /dev/null @@ -1,6 +0,0 @@ -(Experimental Only!) Usage: pio adminserver [--ip <value>] [--port <value>] - - --ip <value> - IP to bind to. Default: localhost - --port <value> - Port to bind to. Default: 7071 http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/twirl/io/prediction/tools/console/app.scala.txt ---------------------------------------------------------------------- diff --git a/tools/src/main/twirl/io/prediction/tools/console/app.scala.txt b/tools/src/main/twirl/io/prediction/tools/console/app.scala.txt deleted file mode 100644 index 49f21b1..0000000 --- a/tools/src/main/twirl/io/prediction/tools/console/app.scala.txt +++ /dev/null @@ -1,74 +0,0 @@ -Usage: pio app new [--id <value>] [--description <value>] [--access-key <value>] - <name> - -Create a new app key to app ID mapping. - - --id <value> - Specify this if you already have data under an app ID. - --description <value> - Description of the new app. - --access-key <value> - Specify a custom default access key. - <name> - App name. - - -Usage: pio app list - -List all apps. - - -Usage: pio app show <name> - -Show details of an app. - - <name> - App name. - - -Usage: pio app delete <name> [--force] - -Name of the app to be deleted. - - <name> - App name. - --force, -f - Delete data without prompting for confirmation - - -Usage: pio app data-delete <name> [--channel <name>] [--all] [--force] - -Delete data of an app. - - <name> - App name. - --channel <name> - Delete data of the specified channel (default channel if not specified) - --all - Delete all data of this app (including both default and all channels) - --force, -f - Delete data without prompting for confirmation - - -Usage: pio app channel-new <name> <channel> - -Create a new channel for the app. - - <name> - App name. - - <channel> - Channel name to be created. - - -Usage: pio app channel-delete <name> <channel> [--force] - -Delete a channel for the app. - - <name> - App name. - - <channel> - Channel name to be deleted. - --force, -f - Delete data without prompting for confirmation http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/twirl/io/prediction/tools/console/build.scala.txt ---------------------------------------------------------------------- diff --git a/tools/src/main/twirl/io/prediction/tools/console/build.scala.txt b/tools/src/main/twirl/io/prediction/tools/console/build.scala.txt deleted file mode 100644 index be80c50..0000000 --- a/tools/src/main/twirl/io/prediction/tools/console/build.scala.txt +++ /dev/null @@ -1,11 +0,0 @@ -Usage: pio build [--sbt-extra <value>] [--clean] [--no-asm] - [common options...] - -Build an engine at the current directory. - - --sbt-extra <value> - Extra command to pass to SBT when it builds your engine. - --clean - Clean build. - --no-asm - Skip building external dependencies assembly.
