http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/console/App.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/App.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/App.scala deleted file mode 100644 index 53b1b40..0000000 --- a/tools/src/main/scala/org/apache/predictionio/tools/console/App.scala +++ /dev/null @@ -1,540 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.tools.console - -import org.apache.predictionio.data.storage - -import grizzled.slf4j.Logging - -case class AppArgs( - id: Option[Int] = None, - name: String = "", - channel: String = "", - dataDeleteChannel: Option[String] = None, - all: Boolean = false, - force: Boolean = false, - description: Option[String] = None) - -object App extends Logging { - def create(ca: ConsoleArgs): Int = { - val apps = storage.Storage.getMetaDataApps() - // get the client in the beginning so error exit right away if can't access client - val events = storage.Storage.getLEvents() - apps.getByName(ca.app.name) map { app => - error(s"App ${ca.app.name} already exists. Aborting.") - 1 - } getOrElse { - ca.app.id.map { id => - apps.get(id) map { app => - error( - s"App ID ${id} already exists and maps to the app '${app.name}'. " + - "Aborting.") - return 1 - } - } - val appid = apps.insert(storage.App( - id = ca.app.id.getOrElse(0), - name = ca.app.name, - description = ca.app.description)) - appid map { id => - val dbInit = events.init(id) - val r = if (dbInit) { - info(s"Initialized Event Store for this app ID: ${id}.") - val accessKeys = storage.Storage.getMetaDataAccessKeys - val accessKey = accessKeys.insert(storage.AccessKey( - key = ca.accessKey.accessKey, - appid = id, - events = Seq())) - accessKey map { k => - info("Created new app:") - info(s" Name: ${ca.app.name}") - info(s" ID: ${id}") - info(s"Access Key: ${k}") - 0 - } getOrElse { - error(s"Unable to create new access key.") - 1 - } - } else { - error(s"Unable to initialize Event Store for this app ID: ${id}.") - // revert back the meta data change - try { - apps.delete(id) - 0 - } catch { - case e: Exception => - error(s"Failed to revert back the App meta-data change.", e) - error(s"The app ${ca.app.name} CANNOT be used!") - error(s"Please run 'pio app delete ${ca.app.name}' " + - "to delete this app!") - 1 - } - } - events.close() - r - } getOrElse { - error(s"Unable to create new app.") - 1 - } - } - } - - def list(ca: ConsoleArgs): Int = { - val apps = storage.Storage.getMetaDataApps.getAll().sortBy(_.name) - val accessKeys = storage.Storage.getMetaDataAccessKeys - val title = "Name" - val ak = "Access Key" - info(f"$title%20s | ID | $ak%64s | Allowed Event(s)") - apps foreach { app => - val keys = accessKeys.getByAppid(app.id) - keys foreach { k => - val events = - if (k.events.size > 0) k.events.sorted.mkString(",") else "(all)" - info(f"${app.name}%20s | ${app.id}%4d | ${k.key}%64s | $events%s") - } - } - info(s"Finished listing ${apps.size} app(s).") - 0 - } - - def show(ca: ConsoleArgs): Int = { - val apps = storage.Storage.getMetaDataApps - val accessKeys = storage.Storage.getMetaDataAccessKeys - val channels = storage.Storage.getMetaDataChannels - apps.getByName(ca.app.name) map { app => - info(s" App Name: ${app.name}") - info(s" App ID: ${app.id}") - info(s" Description: ${app.description.getOrElse("")}") - val keys = accessKeys.getByAppid(app.id) - - var firstKey = true - keys foreach { k => - val events = - if (k.events.size > 0) k.events.sorted.mkString(",") else "(all)" - if (firstKey) { - info(f" Access Key: ${k.key}%s | ${events}%s") - firstKey = false - } else { - info(f" ${k.key}%s | ${events}%s") - } - } - - val chans = channels.getByAppid(app.id) - var firstChan = true - val titleName = "Channel Name" - val titleID = "Channel ID" - chans.foreach { ch => - if (firstChan) { - info(f" Channels: ${titleName}%16s | ${titleID}%10s ") - firstChan = false - } - info(f" ${ch.name}%16s | ${ch.id}%10s") - } - 0 - } getOrElse { - error(s"App ${ca.app.name} does not exist. Aborting.") - 1 - } - } - - def delete(ca: ConsoleArgs): Int = { - val apps = storage.Storage.getMetaDataApps - val accesskeys = storage.Storage.getMetaDataAccessKeys - val channels = storage.Storage.getMetaDataChannels - val events = storage.Storage.getLEvents() - val status = apps.getByName(ca.app.name) map { app => - info(s"The following app (including all channels) will be deleted. Are you sure?") - info(s" App Name: ${app.name}") - info(s" App ID: ${app.id}") - info(s" Description: ${app.description.getOrElse("")}") - val chans = channels.getByAppid(app.id) - var firstChan = true - val titleName = "Channel Name" - val titleID = "Channel ID" - chans.foreach { ch => - if (firstChan) { - info(f" Channels: ${titleName}%16s | ${titleID}%10s ") - firstChan = false - } - info(f" ${ch.name}%16s | ${ch.id}%10s") - } - - val choice = if(ca.app.force) "YES" else readLine("Enter 'YES' to proceed: ") - choice match { - case "YES" => { - // delete channels - val delChannelStatus: Seq[Int] = chans.map { ch => - if (events.remove(app.id, Some(ch.id))) { - info(s"Removed Event Store of the channel ID: ${ch.id}") - try { - channels.delete(ch.id) - info(s"Deleted channel ${ch.name}") - 0 - } catch { - case e: Exception => - error(s"Error deleting channel ${ch.name}.", e) - 1 - } - } else { - error(s"Error removing Event Store of the channel ID: ${ch.id}.") - return 1 - } - } - - if (delChannelStatus.exists(_ != 0)) { - error("Error occurred while deleting channels. Aborting.") - return 1 - } - - try { - events.remove(app.id) - info(s"Removed Event Store for this app ID: ${app.id}") - } catch { - case e: Exception => - error(s"Error removing Event Store for this app. Aborting.", e) - return 1 - } - - accesskeys.getByAppid(app.id) foreach { key => - try { - accesskeys.delete(key.key) - info(s"Removed access key ${key.key}") - } catch { - case e: Exception => - error(s"Error removing access key ${key.key}. Aborting.", e) - return 1 - } - } - - try { - apps.delete(app.id) - info(s"Deleted app ${app.name}.") - } catch { - case e: Exception => - error(s"Error deleting app ${app.name}. Aborting.", e) - return 1 - } - - info("Done.") - 0 - } - case _ => - info("Aborted.") - 0 - } - } getOrElse { - error(s"App ${ca.app.name} does not exist. Aborting.") - 1 - } - events.close() - status - } - - def dataDelete(ca: ConsoleArgs): Int = { - if (ca.app.all) { - dataDeleteAll(ca) - } else { - dataDeleteOne(ca) - } - } - - def dataDeleteOne(ca: ConsoleArgs): Int = { - val apps = storage.Storage.getMetaDataApps - val channels = storage.Storage.getMetaDataChannels - apps.getByName(ca.app.name) map { app => - - val channelId = ca.app.dataDeleteChannel.map { ch => - val channelMap = channels.getByAppid(app.id).map(c => (c.name, c.id)).toMap - if (!channelMap.contains(ch)) { - error(s"Unable to delete data for channel.") - error(s"Channel ${ch} doesn't exist.") - return 1 - } - - channelMap(ch) - } - - if (channelId.isDefined) { - info(s"Data of the following channel will be deleted. Are you sure?") - info(s"Channel Name: ${ca.app.dataDeleteChannel.get}") - info(s" Channel ID: ${channelId.get}") - info(s" App Name: ${app.name}") - info(s" App ID: ${app.id}") - info(s" Description: ${app.description}") - } else { - info(s"Data of the following app (default channel only) will be deleted. Are you sure?") - info(s" App Name: ${app.name}") - info(s" App ID: ${app.id}") - info(s" Description: ${app.description}") - } - - val choice = if(ca.app.force) "YES" else readLine("Enter 'YES' to proceed: ") - - choice match { - case "YES" => { - val events = storage.Storage.getLEvents() - // remove table - val r1 = if (events.remove(app.id, channelId)) { - if (channelId.isDefined) { - info(s"Removed Event Store for this channel ID: ${channelId.get}") - } else { - info(s"Removed Event Store for this app ID: ${app.id}") - } - 0 - } else { - if (channelId.isDefined) { - error(s"Error removing Event Store for this channel.") - } else { - error(s"Error removing Event Store for this app.") - } - 1 - } - // re-create table - val dbInit = events.init(app.id, channelId) - val r2 = if (dbInit) { - if (channelId.isDefined) { - info(s"Initialized Event Store for this channel ID: ${channelId.get}.") - } else { - info(s"Initialized Event Store for this app ID: ${app.id}.") - } - 0 - } else { - if (channelId.isDefined) { - error(s"Unable to initialize Event Store for this channel ID:" + - s" ${channelId.get}.") - } else { - error(s"Unable to initialize Event Store for this appId:" + - s" ${app.id}.") - } - 1 - } - events.close() - info("Done.") - r1 + r2 - } - case _ => - info("Aborted.") - 0 - } - } getOrElse { - error(s"App ${ca.app.name} does not exist. Aborting.") - 1 - } - } - - def dataDeleteAll(ca: ConsoleArgs): Int = { - val apps = storage.Storage.getMetaDataApps - val channels = storage.Storage.getMetaDataChannels - val events = storage.Storage.getLEvents() - val status = apps.getByName(ca.app.name) map { app => - info(s"All data of the app (including default and all channels) will be deleted." + - " Are you sure?") - info(s" App Name: ${app.name}") - info(s" App ID: ${app.id}") - info(s" Description: ${app.description}") - val chans = channels.getByAppid(app.id) - var firstChan = true - val titleName = "Channel Name" - val titleID = "Channel ID" - chans.foreach { ch => - if (firstChan) { - info(f" Channels: ${titleName}%16s | ${titleID}%10s ") - firstChan = false - } - info(f" ${ch.name}%16s | ${ch.id}%10s") - } - - val choice = if(ca.app.force) "YES" else readLine("Enter 'YES' to proceed: ") - choice match { - case "YES" => { - // delete channels - val delChannelStatus: Seq[Int] = chans.map { ch => - val r1 = if (events.remove(app.id, Some(ch.id))) { - info(s"Removed Event Store of the channel ID: ${ch.id}") - 0 - } else { - error(s"Error removing Event Store of the channel ID: ${ch.id}.") - 1 - } - // re-create table - val dbInit = events.init(app.id, Some(ch.id)) - val r2 = if (dbInit) { - info(s"Initialized Event Store of the channel ID: ${ch.id}") - 0 - } else { - error(s"Unable to initialize Event Store of the channel ID: ${ch.id}.") - 1 - } - r1 + r2 - } - - if (delChannelStatus.filter(_ != 0).isEmpty) { - val r1 = if (events.remove(app.id)) { - info(s"Removed Event Store for this app ID: ${app.id}") - 0 - } else { - error(s"Error removing Event Store for this app.") - 1 - } - - val dbInit = events.init(app.id) - val r2 = if (dbInit) { - info(s"Initialized Event Store for this app ID: ${app.id}.") - 0 - } else { - error(s"Unable to initialize Event Store for this appId: ${app.id}.") - 1 - } - info("Done.") - r1 + r2 - } else 1 - } - case _ => - info("Aborted.") - 0 - } - } getOrElse { - error(s"App ${ca.app.name} does not exist. Aborting.") - 1 - } - events.close() - status - } - - def channelNew(ca: ConsoleArgs): Int = { - val apps = storage.Storage.getMetaDataApps - val channels = storage.Storage.getMetaDataChannels - val events = storage.Storage.getLEvents() - val newChannel = ca.app.channel - val status = apps.getByName(ca.app.name) map { app => - val channelMap = channels.getByAppid(app.id).map(c => (c.name, c.id)).toMap - if (channelMap.contains(newChannel)) { - error(s"Unable to create new channel.") - error(s"Channel ${newChannel} already exists.") - 1 - } else if (!storage.Channel.isValidName(newChannel)) { - error(s"Unable to create new channel.") - error(s"The channel name ${newChannel} is invalid.") - error(s"${storage.Channel.nameConstraint}") - 1 - } else { - - val channelId = channels.insert(storage.Channel( - id = 0, // new id will be assigned - appid = app.id, - name = newChannel - )) - channelId.map { chanId => - info(s"Updated Channel meta-data.") - // initialize storage - val dbInit = events.init(app.id, Some(chanId)) - if (dbInit) { - info(s"Initialized Event Store for the channel: ${newChannel}.") - info(s"Created new channel:") - info(s" Channel Name: ${newChannel}") - info(s" Channel ID: ${chanId}") - info(s" App ID: ${app.id}") - 0 - } else { - error(s"Unable to create new channel.") - error(s"Failed to initalize Event Store.") - // reverted back the meta data - try { - channels.delete(chanId) - 0 - } catch { - case e: Exception => - error(s"Failed to revert back the Channel meta-data change.", e) - error(s"The channel ${newChannel} CANNOT be used!") - error(s"Please run 'pio app channel-delete ${app.name} ${newChannel}' " + - "to delete this channel!") - 1 - } - } - }.getOrElse { - error(s"Unable to create new channel.") - error(s"Failed to update Channel meta-data.") - 1 - } - } - } getOrElse { - error(s"App ${ca.app.name} does not exist. Aborting.") - 1 - } - events.close() - status - } - - def channelDelete(ca: ConsoleArgs): Int = { - val apps = storage.Storage.getMetaDataApps - val channels = storage.Storage.getMetaDataChannels - val events = storage.Storage.getLEvents() - val deleteChannel = ca.app.channel - val status = apps.getByName(ca.app.name) map { app => - val channelMap = channels.getByAppid(app.id).map(c => (c.name, c.id)).toMap - if (!channelMap.contains(deleteChannel)) { - error(s"Unable to delete channel.") - error(s"Channel ${deleteChannel} doesn't exist.") - 1 - } else { - info(s"The following channel will be deleted. Are you sure?") - info(s" Channel Name: ${deleteChannel}") - info(s" Channel ID: ${channelMap(deleteChannel)}") - info(s" App Name: ${app.name}") - info(s" App ID: ${app.id}") - val choice = if(ca.app.force) "YES" else readLine("Enter 'YES' to proceed: ") - choice match { - case "YES" => { - // NOTE: remove storage first before remove meta data (in case remove storage failed) - val dbRemoved = events.remove(app.id, Some(channelMap(deleteChannel))) - if (dbRemoved) { - info(s"Removed Event Store for this channel: ${deleteChannel}") - try { - channels.delete(channelMap(deleteChannel)) - info(s"Deleted channel: ${deleteChannel}.") - 0 - } catch { - case e: Exception => - error(s"Unable to delete channel.", e) - error(s"Failed to update Channel meta-data.") - error(s"The channel ${deleteChannel} CANNOT be used!") - error(s"Please run 'pio app channel-delete ${app.name} ${deleteChannel}' " + - "to delete this channel again!") - 1 - } - } else { - error(s"Unable to delete channel.") - error(s"Error removing Event Store for this channel.") - 1 - } - } - case _ => - info("Aborted.") - 0 - } - } - } getOrElse { - error(s"App ${ca.app.name} does not exist. Aborting.") - 1 - } - events.close() - status - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala index 5646467..195740b 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala @@ -29,10 +29,16 @@ import org.apache.predictionio.data.api.EventServerConfig import org.apache.predictionio.data.storage import org.apache.predictionio.data.storage.EngineManifest import org.apache.predictionio.data.storage.EngineManifestSerializer -import org.apache.predictionio.data.storage.hbase.upgrade.Upgrade_0_8_3 import org.apache.predictionio.tools.RegisterEngine import org.apache.predictionio.tools.RunServer import org.apache.predictionio.tools.RunWorkflow +import org.apache.predictionio.tools.Common +import org.apache.predictionio.tools.commands.{ + DashboardArgs, AdminServerArgs, ImportArgs, ExportArgs, + BuildArgs, EngineArgs} +import org.apache.predictionio.tools.{ + EventServerArgs, SparkArgs, WorkflowArgs, ServerArgs, DeployArgs} +import org.apache.predictionio.tools.EventServerArgs import org.apache.predictionio.tools.admin.AdminServer import org.apache.predictionio.tools.admin.AdminServerConfig import org.apache.predictionio.tools.dashboard.Dashboard @@ -40,6 +46,7 @@ import org.apache.predictionio.tools.dashboard.DashboardConfig import org.apache.predictionio.workflow.JsonExtractorOption import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption import org.apache.predictionio.workflow.WorkflowUtils +import org.apache.predictionio.tools.commands import org.apache.commons.io.FileUtils import org.json4s._ import org.json4s.native.JsonMethods._ @@ -54,82 +61,39 @@ import scala.util.Random import scalaj.http.Http case class ConsoleArgs( - common: CommonArgs = CommonArgs(), build: BuildArgs = BuildArgs(), app: AppArgs = AppArgs(), + spark: SparkArgs = SparkArgs(), + engine: EngineArgs = EngineArgs(), + workflow: WorkflowArgs = WorkflowArgs(), accessKey: AccessKeyArgs = AccessKeyArgs(), deploy: DeployArgs = DeployArgs(), eventServer: EventServerArgs = EventServerArgs(), adminServer: AdminServerArgs = AdminServerArgs(), dashboard: DashboardArgs = DashboardArgs(), - upgrade: UpgradeArgs = UpgradeArgs(), - template: TemplateArgs = TemplateArgs(), export: ExportArgs = ExportArgs(), imprt: ImportArgs = ImportArgs(), commands: Seq[String] = Seq(), - metricsClass: Option[String] = None, metricsParamsJsonPath: Option[String] = None, paramsPath: String = "params", engineInstanceId: Option[String] = None, - mainClass: Option[String] = None) - -case class CommonArgs( - batch: String = "", - sparkPassThrough: Seq[String] = Seq(), + mainClass: Option[String] = None, driverPassThrough: Seq[String] = Seq(), pioHome: Option[String] = None, - sparkHome: Option[String] = None, - engineId: Option[String] = None, - engineVersion: Option[String] = None, - engineFactory: Option[String] = None, - engineParamsKey: Option[String] = None, - evaluation: Option[String] = None, - engineParamsGenerator: Option[String] = None, - variantJson: File = new File("engine.json"), - manifestJson: File = new File("manifest.json"), - stopAfterRead: Boolean = false, - stopAfterPrepare: Boolean = false, - skipSanityCheck: Boolean = false, - verbose: Boolean = false, - verbosity: Int = 0, - sparkKryo: Boolean = false, - scratchUri: Option[URI] = None, - jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both) - -case class BuildArgs( - sbt: Option[File] = None, - sbtExtra: Option[String] = None, - sbtAssemblyPackageDependency: Boolean = true, - sbtClean: Boolean = false, - uberJar: Boolean = false, - forceGeneratePIOSbt: Boolean = false) - -case class DeployArgs( - ip: String = "0.0.0.0", - port: Int = 8000, - logUrl: Option[String] = None, - logPrefix: Option[String] = None) - -case class EventServerArgs( - enabled: Boolean = false, - ip: String = "0.0.0.0", - port: Int = 7070, - stats: Boolean = false) - -case class AdminServerArgs( -ip: String = "127.0.0.1", -port: Int = 7071) + verbose: Boolean = false) -case class DashboardArgs( - ip: String = "127.0.0.1", - port: Int = 9000) +case class AppArgs( + id: Option[Int] = None, + name: String = "", + channel: String = "", + dataDeleteChannel: Option[String] = None, + all: Boolean = false, + force: Boolean = false, + description: Option[String] = None) -case class UpgradeArgs( - from: String = "0.0.0", - to: String = "0.0.0", - oldAppId: Int = 0, - newAppId: Int = 0 -) +case class AccessKeyArgs( + accessKey: String = "", + events: Seq[String] = Seq()) object Console extends Logging { def main(args: Array[String]): Unit = { @@ -145,27 +109,27 @@ object Console extends Logging { "for each command for more information.\n\n" + "The following options are common to all commands:\n") opt[String]("pio-home") action { (x, c) => - c.copy(common = c.common.copy(pioHome = Some(x))) + c.copy(pioHome = Some(x)) } text("Root directory of a PredictionIO installation.\n" + " Specify this if automatic discovery fail.") opt[String]("spark-home") action { (x, c) => - c.copy(common = c.common.copy(sparkHome = Some(x))) + c.copy(spark = c.spark.copy(sparkHome = Some(x))) } text("Root directory of an Apache Spark installation.\n" + " If not specified, will try to use the SPARK_HOME\n" + " environmental variable. If this fails as well, default to\n" + " current directory.") opt[String]("engine-id") abbr("ei") action { (x, c) => - c.copy(common = c.common.copy(engineId = Some(x))) + c.copy(engine = c.engine.copy(engineId = Some(x))) } text("Specify an engine ID. Usually used by distributed deployment.") opt[String]("engine-version") abbr("ev") action { (x, c) => - c.copy(common = c.common.copy(engineVersion = Some(x))) + c.copy(engine = c.engine.copy(engineVersion = Some(x))) } text("Specify an engine version. Usually used by distributed " + "deployment.") opt[File]("variant") abbr("v") action { (x, c) => - c.copy(common = c.common.copy(variantJson = x)) + c.copy(workflow = c.workflow.copy(variantJson = x)) } opt[File]("manifest") abbr("m") action { (x, c) => - c.copy(common = c.common.copy(manifestJson = x)) + c.copy(engine = c.engine.copy(manifestJson = x)) } opt[File]("sbt") action { (x, c) => c.copy(build = c.build.copy(sbt = Some(x))) @@ -177,13 +141,13 @@ object Console extends Logging { } } text("Path to sbt. Default: sbt") opt[Unit]("verbose") action { (x, c) => - c.copy(common = c.common.copy(verbose = true)) + c.copy(verbose = true) } opt[Unit]("spark-kryo") abbr("sk") action { (x, c) => - c.copy(common = c.common.copy(sparkKryo = true)) + c.copy(spark = c.spark.copy(sparkKryo = true)) } opt[String]("scratch-uri") action { (x, c) => - c.copy(common = c.common.copy(scratchUri = Some(new URI(x)))) + c.copy(spark = c.spark.copy(scratchUri = Some(new URI(x)))) } note("") cmd("version"). @@ -237,7 +201,7 @@ object Console extends Logging { c.copy(commands = c.commands :+ "train") } children( opt[String]("batch") action { (x, c) => - c.copy(common = c.common.copy(batch = x)) + c.copy(workflow = c.workflow.copy(batch = x)) } text("Batch label of the run."), opt[String]("params-path") action { (x, c) => c.copy(paramsPath = x) @@ -247,28 +211,28 @@ object Console extends Logging { } text("Metrics parameters JSON file. Will try to use\n" + " metrics.json in the base path."), opt[Unit]("skip-sanity-check") abbr("ssc") action { (x, c) => - c.copy(common = c.common.copy(skipSanityCheck = true)) + c.copy(workflow = c.workflow.copy(skipSanityCheck = true)) }, opt[Unit]("stop-after-read") abbr("sar") action { (x, c) => - c.copy(common = c.common.copy(stopAfterRead = true)) + c.copy(workflow = c.workflow.copy(stopAfterRead = true)) }, opt[Unit]("stop-after-prepare") abbr("sap") action { (x, c) => - c.copy(common = c.common.copy(stopAfterPrepare = true)) + c.copy(workflow = c.workflow.copy(stopAfterPrepare = true)) }, opt[Unit]("uber-jar") action { (x, c) => c.copy(build = c.build.copy(uberJar = true)) }, opt[Int]("verbosity") action { (x, c) => - c.copy(common = c.common.copy(verbosity = x)) + c.copy(workflow = c.workflow.copy(verbosity = x)) }, opt[String]("engine-factory") action { (x, c) => - c.copy(common = c.common.copy(engineFactory = Some(x))) + c.copy(workflow = c.workflow.copy(engineFactory = Some(x))) }, opt[String]("engine-params-key") action { (x, c) => - c.copy(common = c.common.copy(engineParamsKey = Some(x))) + c.copy(workflow = c.workflow.copy(engineParamsKey = Some(x))) }, opt[String]("json-extractor") action { (x, c) => - c.copy(common = c.common.copy(jsonExtractor = JsonExtractorOption.withName(x))) + c.copy(workflow = c.workflow.copy(jsonExtractor = JsonExtractorOption.withName(x))) } validate { x => if (JsonExtractorOption.values.map(_.toString).contains(x)) { success @@ -287,16 +251,16 @@ object Console extends Logging { c.copy(commands = c.commands :+ "eval") } children( arg[String]("<evaluation-class>") action { (x, c) => - c.copy(common = c.common.copy(evaluation = Some(x))) + c.copy(workflow = c.workflow.copy(evaluation = Some(x))) }, arg[String]("[<engine-parameters-generator-class>]") optional() action { (x, c) => - c.copy(common = c.common.copy(engineParamsGenerator = Some(x))) + c.copy(workflow = c.workflow.copy(engineParamsGenerator = Some(x))) } text("Optional engine parameters generator class, overriding the first argument"), opt[String]("batch") action { (x, c) => - c.copy(common = c.common.copy(batch = x)) + c.copy(workflow = c.workflow.copy(batch = x)) } text("Batch label of the run."), opt[String]("json-extractor") action { (x, c) => - c.copy(common = c.common.copy(jsonExtractor = JsonExtractorOption.withName(x))) + c.copy(workflow = c.workflow.copy(jsonExtractor = JsonExtractorOption.withName(x))) } validate { x => if (JsonExtractorOption.values.map(_.toString).contains(x)) { success @@ -315,7 +279,7 @@ object Console extends Logging { c.copy(commands = c.commands :+ "deploy") } children( opt[String]("batch") action { (x, c) => - c.copy(common = c.common.copy(batch = x)) + c.copy(workflow = c.workflow.copy(batch = x)) } text("Batch label of the deployment."), opt[String]("engine-instance-id") action { (x, c) => c.copy(engineInstanceId = Some(x)) @@ -354,7 +318,7 @@ object Console extends Logging { c.copy(deploy = c.deploy.copy(logPrefix = Some(x))) }, opt[String]("json-extractor") action { (x, c) => - c.copy(common = c.common.copy(jsonExtractor = JsonExtractorOption.withName(x))) + c.copy(workflow = c.workflow.copy(jsonExtractor = JsonExtractorOption.withName(x))) } validate { x => if (JsonExtractorOption.values.map(_.toString).contains(x)) { success @@ -449,23 +413,10 @@ object Console extends Logging { } note("") cmd("upgrade"). - text("Upgrade tool"). + text("No longer supported!"). action { (_, c) => c.copy(commands = c.commands :+ "upgrade") - } children( - arg[String]("<from version>") action { (x, c) => - c.copy(upgrade = c.upgrade.copy(from = x)) - } text("The version upgraded from."), - arg[String]("<to version>") action { (x, c) => - c.copy(upgrade = c.upgrade.copy(to = x)) - } text("The version upgraded to."), - arg[Int]("<old App ID>") action { (x, c) => - c.copy(upgrade = c.upgrade.copy(oldAppId = x)) - } text("Old App ID."), - arg[Int]("<new App ID>") action { (x, c) => - c.copy(upgrade = c.upgrade.copy(newAppId = x)) - } text("New App ID.") - ) + } note("") cmd("app"). text("Manage apps.\n"). @@ -616,29 +567,12 @@ object Console extends Logging { c.copy(commands = c.commands :+ "template") } children( cmd("get"). + text("No longer supported! Use git clone to download a template"). action { (_, c) => c.copy(commands = c.commands :+ "get") - } children( - arg[String]("<template ID>") required() action { (x, c) => - c.copy(template = c.template.copy(repository = x)) - }, - arg[String]("<new engine directory>") action { (x, c) => - c.copy(template = c.template.copy(directory = x)) - }, - opt[String]("version") action { (x, c) => - c.copy(template = c.template.copy(version = Some(x))) - }, - opt[String]("name") action { (x, c) => - c.copy(template = c.template.copy(name = Some(x))) - }, - opt[String]("package") action { (x, c) => - c.copy(template = c.template.copy(packageName = Some(x))) - }, - opt[String]("email") action { (x, c) => - c.copy(template = c.template.copy(email = Some(x))) - } - ), + }, cmd("list"). + text("No longer supported! Use git to manage your templates"). action { (_, c) => c.copy(commands = c.commands :+ "list") } @@ -694,78 +628,95 @@ object Console extends Logging { } parser.parse(consoleArgs, ConsoleArgs()) map { pca => - val ca = pca.copy(common = pca.common.copy( - sparkPassThrough = sparkPassThroughArgs, - driverPassThrough = driverPassThroughArgs)) - WorkflowUtils.modifyLogging(ca.common.verbose) + val ca = pca.copy( + spark = pca.spark.copy(sparkPassThrough = sparkPassThroughArgs), + driverPassThrough = driverPassThroughArgs) + WorkflowUtils.modifyLogging(ca.verbose) val rv: Int = ca.commands match { case Seq("") => System.err.println(help()) 1 case Seq("version") => - version(ca) - 0 + Pio.version() case Seq("build") => - regenerateManifestJson(ca.common.manifestJson) - build(ca) + Pio.build( + ca.build, ca.pioHome.get, ca.engine.manifestJson, ca.verbose) case Seq("unregister") => - unregister(ca) - 0 + Pio.unregister(ca.engine.manifestJson) case Seq("train") => - regenerateManifestJson(ca.common.manifestJson) - train(ca) + Pio.train( + ca.engine, ca.workflow, ca.spark, ca.pioHome.get, ca.verbose) case Seq("eval") => - regenerateManifestJson(ca.common.manifestJson) - train(ca) + Pio.eval( + ca.engine, ca.workflow, ca.spark, ca.pioHome.get, ca.verbose) case Seq("deploy") => - deploy(ca) + Pio.deploy( + ca.engine, + ca.engineInstanceId, + ServerArgs( + ca.deploy, + ca.eventServer, + ca.workflow.batch, + ca.accessKey.accessKey, + ca.workflow.variantJson, + ca.workflow.jsonExtractor), + ca.spark, + ca.pioHome.get, + ca.verbose) case Seq("undeploy") => - undeploy(ca) + Pio.undeploy(ca.deploy) case Seq("dashboard") => - dashboard(ca) - 0 + Pio.dashboard(ca.dashboard) case Seq("eventserver") => - eventserver(ca) - 0 + Pio.eventserver(ca.eventServer) case Seq("adminserver") => - adminserver(ca) - 0 + Pio.adminserver(ca.adminServer) case Seq("run") => - generateManifestJson(ca.common.manifestJson) - run(ca) + Pio.run( + ca.mainClass.get, + ca.driverPassThrough, + ca.engine.manifestJson, + ca.build, + ca.spark, + ca.pioHome.get, + ca.verbose) case Seq("status") => - status(ca) + Pio.status(ca.pioHome, ca.spark.sparkHome) case Seq("upgrade") => - upgrade(ca) - 0 + error("Upgrade is no longer supported") + 1 case Seq("app", "new") => - App.create(ca) + Pio.App.create( + ca.app.name, ca.app.id, ca.app.description, ca.accessKey.accessKey) case Seq("app", "list") => - App.list(ca) + Pio.App.list() case Seq("app", "show") => - App.show(ca) + Pio.App.show(ca.app.name) case Seq("app", "delete") => - App.delete(ca) + Pio.App.delete(ca.app.name, ca.app.force) case Seq("app", "data-delete") => - App.dataDelete(ca) + Pio.App.dataDelete( + ca.app.name, ca.app.dataDeleteChannel, ca.app.all, ca.app.force) case Seq("app", "channel-new") => - App.channelNew(ca) + Pio.App.channelNew(ca.app.name, ca.app.channel) case Seq("app", "channel-delete") => - App.channelDelete(ca) + Pio.App.channelDelete(ca.app.name, ca.app.channel, ca.app.force) case Seq("accesskey", "new") => - AccessKey.create(ca) + Pio.AccessKey.create( + ca.app.name, ca.accessKey.accessKey, ca.accessKey.events) case Seq("accesskey", "list") => - AccessKey.list(ca) + Pio.AccessKey.list( + if (ca.app.name == "") None else Some(ca.app.name)) case Seq("accesskey", "delete") => - AccessKey.delete(ca) - case Seq("template", "get") => - Template.get(ca) - case Seq("template", "list") => - Template.list(ca) + Pio.AccessKey.delete(ca.accessKey.accessKey) + case Seq("template", _) => + error("template commands are no longer supported.") + error("Please use git to get and manage your templates.") + 1 case Seq("export") => - Export.eventsToFile(ca) + Pio.export(ca.export, ca.spark, ca.pioHome.get) case Seq("import") => - Import.fileToEvents(ca) + Pio.imprt(ca.imprt, ca.spark, ca.pioHome.get) case _ => System.err.println(help(ca.commands)) 1 @@ -809,465 +760,4 @@ object Console extends Logging { "run" -> txt.run().toString, "eval" -> txt.eval().toString, "dashboard" -> txt.dashboard().toString) - - def version(ca: ConsoleArgs): Unit = println(BuildInfo.version) - - def build(ca: ConsoleArgs): Int = { - Template.verifyTemplateMinVersion(new File("template.json")) - compile(ca) - info("Looking for an engine...") - val jarFiles = jarFilesForScala - if (jarFiles.isEmpty) { - error("No engine found. Your build might have failed. Aborting.") - return 1 - } - jarFiles foreach { f => info(s"Found ${f.getName}")} - RegisterEngine.registerEngine( - ca.common.manifestJson, - jarFiles, - false) - info("Your engine is ready for training.") - 0 - } - - def unregister(ca: ConsoleArgs): Unit = { - RegisterEngine.unregisterEngine(ca.common.manifestJson) - } - - def train(ca: ConsoleArgs): Int = { - Template.verifyTemplateMinVersion(new File("template.json")) - withRegisteredManifest( - ca.common.manifestJson, - ca.common.engineId, - ca.common.engineVersion) { em => - RunWorkflow.newRunWorkflow(ca, em) - } - } - - def deploy(ca: ConsoleArgs): Int = { - Template.verifyTemplateMinVersion(new File("template.json")) - withRegisteredManifest( - ca.common.manifestJson, - ca.common.engineId, - ca.common.engineVersion) { em => - val variantJson = parse(Source.fromFile(ca.common.variantJson).mkString) - val variantId = variantJson \ "id" match { - case JString(s) => s - case _ => - error("Unable to read engine variant ID from " + - s"${ca.common.variantJson.getCanonicalPath}. Aborting.") - return 1 - } - val engineInstances = storage.Storage.getMetaDataEngineInstances - val engineInstance = ca.engineInstanceId map { eid => - engineInstances.get(eid) - } getOrElse { - engineInstances.getLatestCompleted(em.id, em.version, variantId) - } - engineInstance map { r => - RunServer.newRunServer(ca, em, r.id) - } getOrElse { - ca.engineInstanceId map { eid => - error( - s"Invalid engine instance ID ${ca.engineInstanceId}. Aborting.") - } getOrElse { - error( - s"No valid engine instance found for engine ${em.id} " + - s"${em.version}.\nTry running 'train' before 'deploy'. Aborting.") - } - 1 - } - } - } - - def dashboard(ca: ConsoleArgs): Unit = { - info(s"Creating dashboard at ${ca.dashboard.ip}:${ca.dashboard.port}") - Dashboard.createDashboard(DashboardConfig( - ip = ca.dashboard.ip, - port = ca.dashboard.port)) - } - - def eventserver(ca: ConsoleArgs): Unit = { - info( - s"Creating Event Server at ${ca.eventServer.ip}:${ca.eventServer.port}") - EventServer.createEventServer(EventServerConfig( - ip = ca.eventServer.ip, - port = ca.eventServer.port, - stats = ca.eventServer.stats)) - } - - def adminserver(ca: ConsoleArgs): Unit = { - info( - s"Creating Admin Server at ${ca.adminServer.ip}:${ca.adminServer.port}") - AdminServer.createAdminServer(AdminServerConfig( - ip = ca.adminServer.ip, - port = ca.adminServer.port - )) - } - - def undeploy(ca: ConsoleArgs): Int = { - val serverUrl = s"http://${ca.deploy.ip}:${ca.deploy.port}" - info( - s"Undeploying any existing engine instance at ${serverUrl}") - try { - val code = Http(s"${serverUrl}/stop").asString.code - code match { - case 200 => 0 - case 404 => - error(s"Another process is using ${serverUrl}. Unable to undeploy.") - 1 - case _ => - error(s"Another process is using ${serverUrl}, or an existing " + - s"engine server is not responding properly (HTTP ${code}). " + - "Unable to undeploy.") - 1 - } - } catch { - case e: java.net.ConnectException => - warn(s"Nothing at ${serverUrl}") - 0 - case _: Throwable => - error("Another process might be occupying " + - s"${ca.deploy.ip}:${ca.deploy.port}. Unable to undeploy.") - 1 - } - } - - def compile(ca: ConsoleArgs): Unit = { - // only add pioVersion to sbt if project/pio.sbt exists - if (new File("project", "pio-build.sbt").exists || ca.build.forceGeneratePIOSbt) { - FileUtils.writeLines( - new File("pio.sbt"), - Seq( - "// Generated automatically by pio build.", - "// Changes in this file will be overridden.", - "", - "pioVersion := \"" + BuildInfo.version + "\"")) - } - implicit val formats = Utils.json4sDefaultFormats - - val sbt = detectSbt(ca) - info(s"Using command '${sbt}' at the current working directory to build.") - info("If the path above is incorrect, this process will fail.") - val asm = - if (ca.build.sbtAssemblyPackageDependency) { - " assemblyPackageDependency" - } else { - "" - } - val clean = if (ca.build.sbtClean) " clean" else "" - val buildCmd = s"${sbt} ${ca.build.sbtExtra.getOrElse("")}${clean} " + - (if (ca.build.uberJar) "assembly" else s"package${asm}") - val core = new File(s"pio-assembly-${BuildInfo.version}.jar") - if (ca.build.uberJar) { - info(s"Uber JAR enabled. Putting ${core.getName} in lib.") - val dst = new File("lib") - dst.mkdir() - FileUtils.copyFileToDirectory( - coreAssembly(ca.common.pioHome.get), - dst, - true) - } else { - if (new File("engine.json").exists()) { - info(s"Uber JAR disabled. Making sure lib/${core.getName} is absent.") - new File("lib", core.getName).delete() - } else { - info("Uber JAR disabled, but current working directory does not look " + - s"like an engine project directory. Please delete lib/${core.getName} manually.") - } - } - info(s"Going to run: ${buildCmd}") - try { - val r = - if (ca.common.verbose) { - buildCmd.!(ProcessLogger(line => info(line), line => error(line))) - } else { - buildCmd.!(ProcessLogger( - line => outputSbtError(line), - line => outputSbtError(line))) - } - if (r != 0) { - error(s"Return code of previous step is ${r}. Aborting.") - sys.exit(1) - } - info("Build finished successfully.") - } catch { - case e: java.io.IOException => - error(s"${e.getMessage}") - sys.exit(1) - } - } - - private def outputSbtError(line: String): Unit = { - """\[.*error.*\]""".r findFirstIn line foreach { _ => error(line) } - } - - def run(ca: ConsoleArgs): Int = { - compile(ca) - - val extraFiles = WorkflowUtils.thirdPartyConfFiles - - val jarFiles = jarFilesForScala - jarFiles foreach { f => info(s"Found JAR: ${f.getName}") } - val allJarFiles = jarFiles.map(_.getCanonicalPath) - val cmd = s"${getSparkHome(ca.common.sparkHome)}/bin/spark-submit --jars " + - s"${allJarFiles.mkString(",")} " + - (if (extraFiles.size > 0) { - s"--files ${extraFiles.mkString(",")} " - } else { - "" - }) + - "--class " + - s"${ca.mainClass.get} ${ca.common.sparkPassThrough.mkString(" ")} " + - coreAssembly(ca.common.pioHome.get) + " " + - ca.common.driverPassThrough.mkString(" ") - val proc = Process( - cmd, - None, - "SPARK_YARN_USER_ENV" -> sys.env.filter(kv => kv._1.startsWith("PIO_")). - map(kv => s"${kv._1}=${kv._2}").mkString(",")) - info(s"Submission command: ${cmd}") - val r = proc.! - if (r != 0) { - error(s"Return code of previous step is ${r}. Aborting.") - return 1 - } - r - } - - def status(ca: ConsoleArgs): Int = { - info("Inspecting PredictionIO...") - ca.common.pioHome map { pioHome => - info(s"PredictionIO ${BuildInfo.version} is installed at $pioHome") - } getOrElse { - error("Unable to locate PredictionIO installation. Aborting.") - return 1 - } - info("Inspecting Apache Spark...") - val sparkHome = getSparkHome(ca.common.sparkHome) - if (new File(s"$sparkHome/bin/spark-submit").exists) { - info(s"Apache Spark is installed at $sparkHome") - val sparkMinVersion = "1.3.0" - val sparkReleaseFile = new File(s"$sparkHome/RELEASE") - if (sparkReleaseFile.exists) { - val sparkReleaseStrings = - Source.fromFile(sparkReleaseFile).mkString.split(' ') - if (sparkReleaseStrings.length < 2) { - warn(stripMarginAndNewlines( - s"""|Apache Spark version information cannot be found (RELEASE file - |is empty). This is a known issue for certain vendors (e.g. - |Cloudera). Please make sure you are using a version of at least - |$sparkMinVersion.""")) - } else { - val sparkReleaseVersion = sparkReleaseStrings(1) - val parsedMinVersion = Version.apply(sparkMinVersion) - val parsedCurrentVersion = Version.apply(sparkReleaseVersion) - if (parsedCurrentVersion >= parsedMinVersion) { - info(stripMarginAndNewlines( - s"""|Apache Spark $sparkReleaseVersion detected (meets minimum - |requirement of $sparkMinVersion)""")) - } else { - error(stripMarginAndNewlines( - s"""|Apache Spark $sparkReleaseVersion detected (does not meet - |minimum requirement. Aborting.""")) - } - } - } else { - warn(stripMarginAndNewlines( - s"""|Apache Spark version information cannot be found. If you are - |using a developmental tree, please make sure you are using a - |version of at least $sparkMinVersion.""")) - } - } else { - error("Unable to locate a proper Apache Spark installation. Aborting.") - return 1 - } - info("Inspecting storage backend connections...") - try { - storage.Storage.verifyAllDataObjects() - } catch { - case e: Throwable => - error("Unable to connect to all storage backends successfully. The " + - "following shows the error message from the storage backend.") - error(s"${e.getMessage} (${e.getClass.getName})", e) - error("Dumping configuration of initialized storage backend sources. " + - "Please make sure they are correct.") - storage.Storage.config.get("sources") map { src => - src foreach { case (s, p) => - error(s"Source Name: $s; Type: ${p.getOrElse("type", "(error)")}; " + - s"Configuration: ${p.getOrElse("config", "(error)")}") - } - } getOrElse { - error("No properly configured storage backend sources.") - } - return 1 - } - info("(sleeping 5 seconds for all messages to show up...)") - Thread.sleep(5000) - info("Your system is all ready to go.") - 0 - } - - def upgrade(ca: ConsoleArgs): Unit = { - (ca.upgrade.from, ca.upgrade.to) match { - case ("0.8.2", "0.8.3") => { - Upgrade_0_8_3.runMain(ca.upgrade.oldAppId, ca.upgrade.newAppId) - } - case _ => - println(s"Upgrade from version ${ca.upgrade.from} to ${ca.upgrade.to}" - + s" is not supported.") - } - } - - def coreAssembly(pioHome: String): File = { - val core = s"pio-assembly-${BuildInfo.version}.jar" - val coreDir = - if (new File(pioHome + File.separator + "RELEASE").exists) { - new File(pioHome + File.separator + "lib") - } else { - new File(pioHome + File.separator + "assembly") - } - val coreFile = new File(coreDir, core) - if (coreFile.exists) { - coreFile - } else { - error(s"PredictionIO Core Assembly (${coreFile.getCanonicalPath}) does " + - "not exist. Aborting.") - sys.exit(1) - } - } - - val manifestAutogenTag = "pio-autogen-manifest" - - def regenerateManifestJson(json: File): Unit = { - val cwd = sys.props("user.dir") - val ha = java.security.MessageDigest.getInstance("SHA-1"). - digest(cwd.getBytes).map("%02x".format(_)).mkString - if (json.exists) { - val em = readManifestJson(json) - if (em.description == Some(manifestAutogenTag) && ha != em.version) { - warn("This engine project directory contains an auto-generated " + - "manifest that has been copied/moved from another location. ") - warn("Regenerating the manifest to reflect the updated location. " + - "This will dissociate with all previous engine instances.") - generateManifestJson(json) - } else { - info(s"Using existing engine manifest JSON at ${json.getCanonicalPath}") - } - } else { - generateManifestJson(json) - } - } - - def generateManifestJson(json: File): Unit = { - val cwd = sys.props("user.dir") - implicit val formats = Utils.json4sDefaultFormats + - new EngineManifestSerializer - val rand = Random.alphanumeric.take(32).mkString - val ha = java.security.MessageDigest.getInstance("SHA-1"). - digest(cwd.getBytes).map("%02x".format(_)).mkString - val em = EngineManifest( - id = rand, - version = ha, - name = new File(cwd).getName, - description = Some(manifestAutogenTag), - files = Seq(), - engineFactory = "") - try { - FileUtils.writeStringToFile(json, write(em), "ISO-8859-1") - } catch { - case e: java.io.IOException => - error(s"Cannot generate ${json} automatically (${e.getMessage}). " + - "Aborting.") - sys.exit(1) - } - } - - def readManifestJson(json: File): EngineManifest = { - implicit val formats = Utils.json4sDefaultFormats + - new EngineManifestSerializer - try { - read[EngineManifest](Source.fromFile(json).mkString) - } catch { - case e: java.io.FileNotFoundException => - error(s"${json.getCanonicalPath} does not exist. Aborting.") - sys.exit(1) - case e: MappingException => - error(s"${json.getCanonicalPath} has invalid content: " + - e.getMessage) - sys.exit(1) - } - } - - def withRegisteredManifest( - json: File, - engineId: Option[String], - engineVersion: Option[String])( - op: EngineManifest => Int): Int = { - val ej = readManifestJson(json) - val id = engineId getOrElse ej.id - val version = engineVersion getOrElse ej.version - storage.Storage.getMetaDataEngineManifests.get(id, version) map { - op - } getOrElse { - error(s"Engine ${id} ${version} cannot be found in the system.") - error("Possible reasons:") - error("- the engine is not yet built by the 'build' command;") - error("- the meta data store is offline.") - 1 - } - } - - def jarFilesAt(path: File): Array[File] = recursiveListFiles(path) filter { - _.getName.toLowerCase.endsWith(".jar") - } - - def jarFilesForScala: Array[File] = { - val libFiles = jarFilesForScalaFilter(jarFilesAt(new File("lib"))) - val targetFiles = jarFilesForScalaFilter(jarFilesAt(new File("target" + - File.separator + s"scala-${scalaVersionNoPatch}"))) - // Use libFiles is target is empty. - if (targetFiles.size > 0) targetFiles else libFiles - } - - def jarFilesForScalaFilter(jars: Array[File]): Array[File] = - jars.filterNot { f => - f.getName.toLowerCase.endsWith("-javadoc.jar") || - f.getName.toLowerCase.endsWith("-sources.jar") - } - - def recursiveListFiles(f: File): Array[File] = { - Option(f.listFiles) map { these => - these ++ these.filter(_.isDirectory).flatMap(recursiveListFiles) - } getOrElse Array[File]() - } - - def getSparkHome(sparkHome: Option[String]): String = { - sparkHome getOrElse { - sys.env.getOrElse("SPARK_HOME", ".") - } - } - - def versionNoPatch(fullVersion: String): String = { - val v = """^(\d+\.\d+)""".r - val versionNoPatch = for { - v(np) <- v findFirstIn fullVersion - } yield np - versionNoPatch.getOrElse(fullVersion) - } - - def scalaVersionNoPatch: String = versionNoPatch(BuildInfo.scalaVersion) - - def detectSbt(ca: ConsoleArgs): String = { - ca.build.sbt map { - _.getCanonicalPath - } getOrElse { - val f = new File(Seq(ca.common.pioHome.get, "sbt", "sbt").mkString( - File.separator)) - if (f.exists) f.getCanonicalPath else "sbt" - } - } - - def stripMarginAndNewlines(string: String): String = - string.stripMargin.replaceAll("\n", " ") } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/console/Export.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Export.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Export.scala deleted file mode 100644 index 20c9e02..0000000 --- a/tools/src/main/scala/org/apache/predictionio/tools/console/Export.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.tools.console - -import org.apache.predictionio.tools.Runner - -case class ExportArgs( - appId: Int = 0, - channel: Option[String] = None, - outputPath: String = "", - format: String = "json") - -object Export { - def eventsToFile(ca: ConsoleArgs): Int = { - val channelArg = ca.export.channel - .map(ch => Seq("--channel", ch)).getOrElse(Nil) - Runner.runOnSpark( - "org.apache.predictionio.tools.export.EventsToFile", - Seq( - "--appid", - ca.export.appId.toString, - "--output", - ca.export.outputPath, - "--format", - ca.export.format) ++ channelArg, - ca, - Nil) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/console/Import.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Import.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Import.scala deleted file mode 100644 index b808574..0000000 --- a/tools/src/main/scala/org/apache/predictionio/tools/console/Import.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.tools.console - -import org.apache.predictionio.tools.Runner - -case class ImportArgs( - appId: Int = 0, - channel: Option[String] = None, - inputPath: String = "") - -object Import { - def fileToEvents(ca: ConsoleArgs): Int = { - val channelArg = ca.imprt.channel - .map(ch => Seq("--channel", ch)).getOrElse(Nil) - Runner.runOnSpark( - "org.apache.predictionio.tools.imprt.FileToEvents", - Seq( - "--appid", - ca.imprt.appId.toString, - "--input", - ca.imprt.inputPath) ++ channelArg, - ca, - Nil) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala new file mode 100644 index 0000000..77075a7 --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Pio.scala @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools.console + +import org.apache.predictionio.tools.{ + EventServerArgs, SparkArgs, WorkflowArgs, ServerArgs, DeployArgs} +import org.apache.predictionio.tools.commands.Management +import org.apache.predictionio.tools.commands.{ + DashboardArgs, AdminServerArgs, ImportArgs, ExportArgs, + BuildArgs, EngineArgs, AppDescription} +import org.apache.predictionio.tools.commands.Engine +import org.apache.predictionio.tools.commands.{ + App => AppCmd, AccessKey => AccessKeysCmd} +import org.apache.predictionio.tools.ReturnTypes._ +import org.apache.predictionio.tools.commands.Import +import org.apache.predictionio.tools.commands.Export + +import grizzled.slf4j.Logging +import scala.concurrent.{Future, ExecutionContext, Await} +import scala.concurrent.duration._ +import scala.language.implicitConversions +import scala.sys.process._ +import java.io.File + +import akka.actor.ActorSystem + +object Pio extends Logging { + + private implicit def eitherToInt[A, B](result: Either[A, B]): Int = { + result fold (_ => 1, _ => 0) + } + + private def doOnSuccess[A, B](result: Either[A, B])(f: B => Int): Int = { + result match { + case Left(_) => 1 + case Right(res) => f(res) + } + } + + private def processAwaitAndClean(maybeProc: Expected[(Process, () => Unit)]) = { + maybeProc match { + case Left(_) => 1 + + case Right((proc, cleanup)) => + Runtime.getRuntime.addShutdownHook(new Thread(new Runnable { + def run(): Unit = { + cleanup() + proc.destroy() + } + })) + val returnVal = proc.exitValue() + cleanup() + returnVal + } + } + + def version(): Int = { + println(Management.version) + 0 + } + + def build( + buildArgs: BuildArgs, + pioHome: String, + manifestJson: File, + verbose: Boolean = false): Int = { + + doOnSuccess(Engine.build(buildArgs, pioHome, manifestJson, verbose)) { + _ => info("Your engine is ready for training.") + 0 + } + } + + def unregister(manifestJson: File): Int = Engine.unregister(manifestJson) + + def train( + ea: EngineArgs, + wa: WorkflowArgs, + sa: SparkArgs, + pioHome: String, + verbose: Boolean = false): Int = + processAwaitAndClean(Engine.train(ea, wa, sa, pioHome, verbose)) + + def eval( + ea: EngineArgs, + wa: WorkflowArgs, + sa: SparkArgs, + pioHome: String, + verbose: Boolean = false): Int = + processAwaitAndClean(Engine.train(ea, wa, sa, pioHome, verbose)) + + def deploy( + ea: EngineArgs, + engineInstanceId: Option[String], + serverArgs: ServerArgs, + sparkArgs: SparkArgs, + pioHome: String, + verbose: Boolean = false): Int = + processAwaitAndClean(Engine.deploy( + ea, engineInstanceId, serverArgs, sparkArgs, pioHome, verbose)) + + def undeploy(da: DeployArgs): Int = Engine.undeploy(da) + + def dashboard(da: DashboardArgs): Int = { + Management.dashboard(da).awaitTermination + 0 + } + + def eventserver(ea: EventServerArgs): Int = { + Management.eventserver(ea).awaitTermination + 0 + } + + def adminserver(aa: AdminServerArgs): Int = { + Management.adminserver(aa).awaitTermination + 0 + } + + def run( + mainClass: String, + driverArguments: Seq[String], + manifestJson: File, + buildArgs: BuildArgs, + sparkArgs: SparkArgs, + pioHome: String, + verbose: Boolean = false): Int = + doOnSuccess(Engine.run( + mainClass, driverArguments, manifestJson, + buildArgs, sparkArgs, pioHome, verbose)) { proc => + + val r = proc.exitValue() + if (r != 0) { + error(s"Return code of previous step is ${r}. Aborting.") + return 1 + } + r + } + + + def status(pioHome: Option[String], sparkHome: Option[String]): Int = { + Management.status(pioHome, sparkHome) + } + + def imprt(ia: ImportArgs, sa: SparkArgs, pioHome: String): Int = { + processAwaitAndClean(Import.fileToEvents(ia, sa, pioHome)) + } + + def export(ea: ExportArgs, sa: SparkArgs, pioHome: String): Int = { + processAwaitAndClean(Export.eventsToFile(ea, sa, pioHome)) + } + + object App { + + def create( + name: String, + id: Option[Int] = None, + description: Option[String] = None, + accessKey: String = ""): Int = + doOnSuccess(AppCmd.create(name, id, description, accessKey)) { appDesc => + info("Created a new app:") + info(s" Name: ${appDesc.app.name}") + info(s" ID: ${appDesc.app.id}") + info(s"Access Key: ${appDesc.keys.head.key}") + 0 + } + + def list(): Int = { + val title = "Name" + val ak = "Access Key" + val apps = AppCmd.list + info(f"$title%20s | ID | $ak%64s | Allowed Event(s)") + apps foreach { appDesc => + appDesc.keys foreach { k => + val events = + if (k.events.size > 0) k.events.sorted.mkString(",") else "(all)" + info(f"${appDesc.app.name}%20s | ${appDesc.app.id}%4d | ${k.key}%64s | $events%s") + } + } + info(s"Finished listing ${apps.size} app(s).") + 0 + } + + def show(appName: String): Int = + doOnSuccess(AppCmd.show(appName)) { case (appDesc, chans) => + info(s" App Name: ${appDesc.app.name}") + info(s" App ID: ${appDesc.app.id}") + info(s" Description: ${appDesc.app.description.getOrElse("")}") + + var firstKey = true + appDesc.keys foreach { k => + val events = + if (k.events.size > 0) k.events.sorted.mkString(",") else "(all)" + if (firstKey) { + info(f" Access Key: ${k.key}%s | ${events}%s") + firstKey = false + } else { + info(f" ${k.key}%s | ${events}%s") + } + } + var firstChan = true + val titleName = "Channel Name" + val titleID = "Channel ID" + chans.foreach { ch => + if (firstChan) { + info(f" Channels: ${titleName}%16s | ${titleID}%10s ") + firstChan = false + } + info(f" ${ch.name}%16s | ${ch.id}%10s") + } + 0 + } + + def delete(name: String, force: Boolean = false): Int = + doOnSuccess(AppCmd.show(name)) { case (appDesc, chans) => + info(s"The following app (including all channels) will be deleted. Are you sure?") + info(s" App Name: ${appDesc.app.name}") + info(s" App ID: ${appDesc.app.id}") + info(s" Description: ${appDesc.app.description.getOrElse("")}") + var firstChan = true + val titleName = "Channel Name" + val titleID = "Channel ID" + chans.foreach { ch => + if (firstChan) { + info(f" Channels: ${titleName}%16s | ${titleID}%10s ") + firstChan = false + } + info(f" ${ch.name}%16s | ${ch.id}%10s") + } + + val choice = if(force) "YES" else readLine("Enter 'YES' to proceed: ") + choice match { + case "YES" => + AppCmd.delete(name) + case _ => + info("Aborted.") + 0 + } + } + + def dataDelete( + name: String, + channel: Option[String] = None, + all: Boolean = false, + force: Boolean = false): Int = + doOnSuccess(AppCmd.show(name)) { case (appDesc, chans) => + + val channelId = channel.map { ch => + val channelMap = chans.map(c => (c.name, c.id)).toMap + if (!channelMap.contains(ch)) { + error(s"Unable to delete data for channel.") + error(s"Channel ${ch} doesn't exist.") + return 1 + } + channelMap(ch) + } + if (all) { + info(s"All data of the app (including default and all channels) will be deleted." + + " Are you sure?") + } else if (channelId.isDefined) { + info(s"Data of the following channel will be deleted. Are you sure?") + info(s"Channel Name: ${channel.get}") + info(s" Channel ID: ${channelId.get}") + } else { + info(s"Data of the following app (default channel only) will be deleted. Are you sure?") + } + info(s" App Name: ${appDesc.app.name}") + info(s" App ID: ${appDesc.app.id}") + info(s" Description: ${appDesc.app.description}") + + val choice = if(force) "YES" else readLine("Enter 'YES' to proceed: ") + choice match { + case "YES" => + AppCmd.dataDelete(name, channel, all) + case _ => + info("Aborted.") + 0 + } + } + + def channelNew(appName: String, newChannel: String): Int = + AppCmd.channelNew(appName, newChannel) + + def channelDelete( + appName: String, + deleteChannel: String, + force: Boolean = false): Int = + doOnSuccess(AppCmd.show(appName)) { case (appDesc, chans) => + chans.find(chan => chan.name == deleteChannel) match { + case None => + error(s"Unable to delete channel.") + error(s"Channel ${deleteChannel} doesn't exist.") + 1 + case Some(chan) => + info(s"The following channel will be deleted. Are you sure?") + info(s" Channel Name: ${deleteChannel}") + info(s" Channel ID: ${chan.id}") + info(s" App Name: ${appDesc.app.name}") + info(s" App ID: ${appDesc.app.id}") + val choice = if(force) "YES" else readLine("Enter 'YES' to proceed: ") + choice match { + case "YES" => + AppCmd.channelDelete(appName, deleteChannel) + case _ => + info("Aborted.") + 0 + } + } + } + + } + + object AccessKey { + + def create( + appName: String, + key: String, + events: Seq[String]): Int = + AccessKeysCmd.create(appName, key, events) + + def list(app: Option[String]): Int = + doOnSuccess(AccessKeysCmd.list(app)) { keys => + val title = "Access Key(s)" + info(f"$title%64s | App ID | Allowed Event(s)") + keys.sortBy(k => k.appid) foreach { k => + val events = + if (k.events.size > 0) k.events.sorted.mkString(",") else "(all)" + info(f"${k.key}%64s | ${k.appid}%6d | $events%s") + } + info(s"Finished listing ${keys.size} access key(s).") + 0 + } + + def delete(key: String): Int = AccessKeysCmd.delete(key) + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/console/Template.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Template.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Template.scala deleted file mode 100644 index 0136d25..0000000 --- a/tools/src/main/scala/org/apache/predictionio/tools/console/Template.scala +++ /dev/null @@ -1,432 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.tools.console - -import java.io.BufferedInputStream -import java.io.BufferedOutputStream -import java.io.File -import java.io.FileInputStream -import java.io.FileOutputStream -import java.net.ConnectException -import java.net.URI -import java.util.zip.ZipInputStream - -import grizzled.slf4j.Logging -import org.apache.predictionio.controller.Utils -import org.apache.predictionio.core.BuildInfo -import org.apache.commons.io.FileUtils -import org.json4s._ -import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization.read -import org.json4s.native.Serialization.write -import semverfi._ - -import scala.io.Source -import scala.sys.process._ -import scalaj.http._ - -case class TemplateArgs( - directory: String = "", - repository: String = "", - version: Option[String] = None, - name: Option[String] = None, - packageName: Option[String] = None, - email: Option[String] = None) - -case class GitHubTag( - name: String, - zipball_url: String, - tarball_url: String, - commit: GitHubCommit) - -case class GitHubCommit( - sha: String, - url: String) - -case class GitHubCache( - headers: Map[String, String], - body: String) - -case class TemplateEntry( - repo: String) - -case class TemplateMetaData( - pioVersionMin: Option[String] = None) - -object Template extends Logging { - implicit val formats = Utils.json4sDefaultFormats - - def templateMetaData(templateJson: File): TemplateMetaData = { - if (!templateJson.exists) { - warn(s"$templateJson does not exist. Template metadata will not be available. " + - "(This is safe to ignore if you are not working on a template.)") - TemplateMetaData() - } else { - val jsonString = Source.fromFile(templateJson)(scala.io.Codec.ISO8859).mkString - val json = try { - parse(jsonString) - } catch { - case e: org.json4s.ParserUtil.ParseException => - warn(s"$templateJson cannot be parsed. Template metadata will not be available.") - return TemplateMetaData() - } - val pioVersionMin = json \ "pio" \ "version" \ "min" - pioVersionMin match { - case JString(s) => TemplateMetaData(pioVersionMin = Some(s)) - case _ => TemplateMetaData() - } - } - } - - /** Creates a wrapper that provides the functionality of scalaj.http.Http() - * with automatic proxy settings handling. The proxy settings will first - * come from "git" followed by system properties "http.proxyHost" and - * "http.proxyPort". - * - * @param url URL to be connected - * @return - */ - def httpOptionalProxy(url: String): HttpRequest = { - val gitProxy = try { - Some(Process("git config --global http.proxy").lines.toList(0)) - } catch { - case e: Throwable => None - } - - val (host, port) = gitProxy map { p => - val proxyUri = new URI(p) - (Option(proxyUri.getHost), - if (proxyUri.getPort == -1) None else Some(proxyUri.getPort)) - } getOrElse { - (sys.props.get("http.proxyHost"), - sys.props.get("http.proxyPort").map { p => - try { - Some(p.toInt) - } catch { - case e: NumberFormatException => None - } - } getOrElse None) - } - - (host, port) match { - case (Some(h), Some(p)) => Http(url).proxy(h, p) - case _ => Http(url) - } - } - - def getGitHubRepos( - repos: Seq[String], - apiType: String, - repoFilename: String): Map[String, GitHubCache] = { - val reposCache = try { - val cache = - Source.fromFile(repoFilename)(scala.io.Codec.ISO8859).mkString - read[Map[String, GitHubCache]](cache) - } catch { - case e: Throwable => Map[String, GitHubCache]() - } - val newReposCache = reposCache ++ (try { - repos.map { repo => - val url = s"https://api.github.com/repos/$repo/$apiType" - val http = httpOptionalProxy(url) - val response = reposCache.get(repo).map { cache => - cache.headers.get("ETag").map { etag => - http.header("If-None-Match", etag).asString - } getOrElse { - http.asString - } - } getOrElse { - http.asString - } - - val body = if (response.code == 304) { - reposCache(repo).body - } else { - response.body - } - - repo -> GitHubCache(headers = response.headers, body = body) - }.toMap - } catch { - case e: ConnectException => - githubConnectErrorMessage(e) - Map() - }) - FileUtils.writeStringToFile( - new File(repoFilename), - write(newReposCache), - "ISO-8859-1") - newReposCache - } - - def sub(repo: String, name: String, email: String, org: String): Unit = { - val data = Map( - "repo" -> repo, - "name" -> name, - "email" -> email, - "org" -> org) - try { - httpOptionalProxy("https://update.prediction.io/templates.subscribe"). - postData("json=" + write(data)).asString - } catch { - case e: Throwable => error("Unable to subscribe.") - } - } - - def meta(repo: String, name: String, org: String): Unit = { - try { - httpOptionalProxy( - s"https://meta.prediction.io/templates/$repo/$org/$name").asString - } catch { - case e: Throwable => debug("Template metadata unavailable.") - } - } - - def list(ca: ConsoleArgs): Int = { - val templatesUrl = "https://templates.prediction.io/index.json" - try { - val templatesJson = Source.fromURL(templatesUrl).mkString("") - val templates = read[List[TemplateEntry]](templatesJson) - println("The following is a list of template IDs registered on " + - "PredictionIO Template Gallery:") - println() - templates.sortBy(_.repo.toLowerCase).foreach { template => - println(template.repo) - } - println() - println("Notice that it is possible use any GitHub repository as your " + - "engine template ID (e.g. YourOrg/YourTemplate).") - 0 - } catch { - case e: Throwable => - error(s"Unable to list templates from $templatesUrl " + - s"(${e.getMessage}). Aborting.") - 1 - } - } - - def githubConnectErrorMessage(e: ConnectException): Unit = { - error(s"Unable to connect to GitHub (Reason: ${e.getMessage}). " + - "Please check your network configuration and proxy settings.") - } - - def get(ca: ConsoleArgs): Int = { - val repos = - getGitHubRepos(Seq(ca.template.repository), "tags", ".templates-cache") - - repos.get(ca.template.repository).map { repo => - try { - read[List[GitHubTag]](repo.body) - } catch { - case e: MappingException => - error(s"Either ${ca.template.repository} is not a valid GitHub " + - "repository, or it does not have any tag. Aborting.") - return 1 - } - } getOrElse { - error(s"Failed to retrieve ${ca.template.repository}. Aborting.") - return 1 - } - - val name = ca.template.name getOrElse { - try { - Process("git config --global user.name").lines.toList(0) - } catch { - case e: Throwable => - readLine("Please enter author's name: ") - } - } - - val organization = ca.template.packageName getOrElse { - readLine( - "Please enter the template's Scala package name (e.g. com.mycompany): ") - } - - val email = ca.template.email getOrElse { - try { - Process("git config --global user.email").lines.toList(0) - } catch { - case e: Throwable => - readLine("Please enter author's e-mail address: ") - } - } - - println(s"Author's name: $name") - println(s"Author's e-mail: $email") - println(s"Author's organization: $organization") - - var subscribe = readLine("Would you like to be informed about new bug " + - "fixes and security updates of this template? (Y/n) ") - var valid = false - - do { - subscribe match { - case "" | "Y" | "y" => - sub(ca.template.repository, name, email, organization) - valid = true - case "n" | "N" => - meta(ca.template.repository, name, organization) - valid = true - case _ => - println("Please answer 'y' or 'n'") - subscribe = readLine("(Y/n)? ") - } - } while (!valid) - - val repo = repos(ca.template.repository) - - println(s"Retrieving ${ca.template.repository}") - val tags = read[List[GitHubTag]](repo.body) - println(s"There are ${tags.size} tags") - - if (tags.size == 0) { - println(s"${ca.template.repository} does not have any tag. Aborting.") - return 1 - } - - val tag = ca.template.version.map { v => - tags.find(_.name == v).getOrElse { - println(s"${ca.template.repository} does not have tag $v. Aborting.") - return 1 - } - } getOrElse tags.head - - println(s"Using tag ${tag.name}") - val url = - s"https://github.com/${ca.template.repository}/archive/${tag.name}.zip" - println(s"Going to download $url") - val trial = try { - httpOptionalProxy(url).asBytes - } catch { - case e: ConnectException => - githubConnectErrorMessage(e) - return 1 - } - val finalTrial = try { - trial.location.map { loc => - println(s"Redirecting to $loc") - httpOptionalProxy(loc).asBytes - } getOrElse trial - } catch { - case e: ConnectException => - githubConnectErrorMessage(e) - return 1 - } - val zipFilename = - s"${ca.template.repository.replace('/', '-')}-${tag.name}.zip" - FileUtils.writeByteArrayToFile( - new File(zipFilename), - finalTrial.body) - val zis = new ZipInputStream( - new BufferedInputStream(new FileInputStream(zipFilename))) - val bufferSize = 4096 - val filesToModify = collection.mutable.ListBuffer[String]() - var ze = zis.getNextEntry - while (ze != null) { - val filenameSegments = ze.getName.split(File.separatorChar) - val destFilename = (ca.template.directory +: filenameSegments.tail). - mkString(File.separator) - if (ze.isDirectory) { - new File(destFilename).mkdirs - } else { - val os = new BufferedOutputStream( - new FileOutputStream(destFilename), - bufferSize) - val data = Array.ofDim[Byte](bufferSize) - var count = zis.read(data, 0, bufferSize) - while (count != -1) { - os.write(data, 0, count) - count = zis.read(data, 0, bufferSize) - } - os.flush() - os.close() - - val nameOnly = new File(destFilename).getName - - if (organization != "" && - (nameOnly.endsWith(".scala") || - nameOnly == "build.sbt" || - nameOnly == "engine.json")) { - filesToModify += destFilename - } - } - ze = zis.getNextEntry - } - zis.close() - new File(zipFilename).delete - - val engineJsonFile = - new File(ca.template.directory, "engine.json") - - val engineJson = try { - Some(parse(Source.fromFile(engineJsonFile).mkString)) - } catch { - case e: java.io.IOException => - error("Unable to read engine.json. Skipping automatic package " + - "name replacement.") - None - case e: MappingException => - error("Unable to parse engine.json. Skipping automatic package " + - "name replacement.") - None - } - - val engineFactory = engineJson.map { ej => - (ej \ "engineFactory").extractOpt[String] - } getOrElse None - - engineFactory.map { ef => - val pkgName = ef.split('.').dropRight(1).mkString(".") - println(s"Replacing $pkgName with $organization...") - - filesToModify.foreach { ftm => - println(s"Processing $ftm...") - val fileContent = Source.fromFile(ftm).getLines() - val processedLines = - fileContent.map(_.replaceAllLiterally(pkgName, organization)) - FileUtils.writeStringToFile( - new File(ftm), - processedLines.mkString("\n")) - } - } getOrElse { - error("engineFactory is not found in engine.json. Skipping automatic " + - "package name replacement.") - } - - verifyTemplateMinVersion(new File(ca.template.directory, "template.json")) - - println(s"Engine template ${ca.template.repository} is now ready at " + - ca.template.directory) - - 0 - } - - def verifyTemplateMinVersion(templateJsonFile: File): Unit = { - val metadata = templateMetaData(templateJsonFile) - - metadata.pioVersionMin.foreach { pvm => - if (Version(BuildInfo.version) < Version(pvm)) { - error(s"This engine template requires at least PredictionIO $pvm. " + - s"The template may not work with PredictionIO ${BuildInfo.version}.") - sys.exit(1) - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala index 402e853..2e9fb26 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/dashboard/Dashboard.scala @@ -55,12 +55,13 @@ object Dashboard extends Logging with SSLConfiguration { } parser.parse(args, DashboardConfig()) map { dc => - createDashboard(dc) + createDashboard(dc).awaitTermination } } - def createDashboard(dc: DashboardConfig): Unit = { - implicit val system = ActorSystem("pio-dashboard") + def createDashboard(dc: DashboardConfig): ActorSystem = { + val systemName = "pio-dashboard" + implicit val system = ActorSystem(systemName) val service = system.actorOf(Props(classOf[DashboardActor], dc), "dashboard") implicit val timeout = Timeout(5.seconds) @@ -72,7 +73,7 @@ object Dashboard extends Logging with SSLConfiguration { interface = dc.ip, port = dc.port, settings = Some(settings.copy(sslEncryption = sslEnforced))) - system.awaitTermination + system } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/twirl/org/apache/predictionio/tools/console/template.scala.txt ---------------------------------------------------------------------- diff --git a/tools/src/main/twirl/org/apache/predictionio/tools/console/template.scala.txt b/tools/src/main/twirl/org/apache/predictionio/tools/console/template.scala.txt index f97c8ce..909e15c 100644 --- a/tools/src/main/twirl/org/apache/predictionio/tools/console/template.scala.txt +++ b/tools/src/main/twirl/org/apache/predictionio/tools/console/template.scala.txt @@ -1,25 +1,3 @@ Usage: pio template list -Retrieves a list of available template IDs. - -Usage: pio template get <template ID> <new engine directory> - [--version <version>] - [--name <value>] [--package <value>] [--email <value>] - -Seeds a directory with an engine template. - - <template ID> - Engine template ID. - <new engine directory> - Location of the new engine. - --version <value> - The template version to get. By default, the most recently tagged version - will be downloaded. - --name <value> - Name of the author of the new engine. - --package <value> - Scala package name of the new engine. - --email <value> - E-mail address of the author of the new engine. Specify this if you want - to receive updates (critical bug fixes, etc) about the engine template - that you are going to use. +No longer supported! Please use git to download and manage your templates.
