[PIO-28] Console refactor

Extract logic from functions handling console commands in tools package

Closes #283


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/e4a3c0c9
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/e4a3c0c9
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/e4a3c0c9

Branch: refs/heads/develop
Commit: e4a3c0c9fc1251d7355d921acb66168226446b3f
Parents: f7c30d3
Author: Marcin Ziemiński <[email protected]>
Authored: Tue Nov 1 10:54:07 2016 -0700
Committer: Donald Szeto <[email protected]>
Committed: Tue Nov 1 10:54:07 2016 -0700

----------------------------------------------------------------------
 .../predictionio/data/api/EventServer.scala     |   5 +-
 .../org/apache/predictionio/tools/Common.scala  | 114 +++
 .../predictionio/tools/RegisterEngine.scala     |  18 +-
 .../apache/predictionio/tools/RunServer.scala   | 176 ++---
 .../apache/predictionio/tools/RunWorkflow.scala | 195 +----
 .../org/apache/predictionio/tools/Runner.scala  |  66 +-
 .../predictionio/tools/admin/AdminAPI.scala     |   5 +-
 .../predictionio/tools/commands/AccessKey.scala |  70 ++
 .../predictionio/tools/commands/App.scala       | 365 +++++++++
 .../predictionio/tools/commands/Engine.scala    | 424 +++++++++++
 .../predictionio/tools/commands/Export.scala    |  54 ++
 .../predictionio/tools/commands/Import.scala    |  51 ++
 .../tools/commands/Management.scala             | 178 +++++
 .../predictionio/tools/commands/Template.scala  |  71 ++
 .../predictionio/tools/console/AccessKey.scala  |  86 ---
 .../apache/predictionio/tools/console/App.scala | 540 --------------
 .../predictionio/tools/console/Console.scala    | 734 +++----------------
 .../predictionio/tools/console/Export.scala     |  45 --
 .../predictionio/tools/console/Import.scala     |  42 --
 .../apache/predictionio/tools/console/Pio.scala | 352 +++++++++
 .../predictionio/tools/console/Template.scala   | 432 -----------
 .../tools/dashboard/Dashboard.scala             |   9 +-
 .../tools/console/template.scala.txt            |  24 +-
 .../tools/console/upgrade.scala.txt             |  14 +-
 24 files changed, 1932 insertions(+), 2138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala 
b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
index a0ba40f..648316e 100644
--- a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
+++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala
@@ -611,7 +611,7 @@ case class EventServerConfig(
   stats: Boolean = false)
 
 object EventServer {
-  def createEventServer(config: EventServerConfig): Unit = {
+  def createEventServer(config: EventServerConfig): ActorSystem = {
     implicit val system = ActorSystem("EventServerSystem")
 
     val eventClient = Storage.getLEvents()
@@ -630,7 +630,7 @@ object EventServer {
     if (config.stats) system.actorOf(Props[StatsActor], "StatsActor")
     system.actorOf(Props[PluginsActor], "PluginsActor")
     serverActor ! StartServer(config.ip, config.port)
-    system.awaitTermination()
+    system
   }
 }
 
@@ -639,5 +639,6 @@ object Run {
     EventServer.createEventServer(EventServerConfig(
       ip = "0.0.0.0",
       port = 7070))
+    .awaitTermination
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/Common.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/Common.scala 
b/tools/src/main/scala/org/apache/predictionio/tools/Common.scala
new file mode 100644
index 0000000..c379138
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/Common.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools
+
+import org.apache.predictionio.core.BuildInfo
+import org.apache.predictionio.tools.ReturnTypes._
+
+import grizzled.slf4j.Logging
+import java.io.File
+
+
+object ReturnTypes {
+  sealed case class Ok()
+
+  type MaybeError = Either[String, Ok]
+  type Expected[T] = Either[String, T]
+
+  val Success: MaybeError = Right(Ok())
+}
+
+trait EitherLogging extends Logging {
+  import ReturnTypes._
+
+  protected def logAndFail[T](msg: => String): Expected[T] = {
+    error(msg)
+    Left(msg)
+  }
+
+  protected def logOnFail[T](msg: => String, t: => Throwable): Expected[T] = {
+    error(msg, t)
+    Left(msg)
+  }
+
+  protected def logAndReturn[T](value: T, msg: => Any): Expected[T] = {
+    info(msg)
+    Right(value)
+  }
+
+  protected def logAndSucceed(msg: => Any): MaybeError = {
+    info(msg)
+    Success
+  }
+}
+
+object Common extends EitherLogging {
+
+  def getSparkHome(sparkHome: Option[String]): String = {
+    sparkHome getOrElse {
+      sys.env.getOrElse("SPARK_HOME", ".")
+    }
+  }
+
+  def versionNoPatch(fullVersion: String): String = {
+    val v = """^(\d+\.\d+)""".r
+    val versionNoPatch = for {
+      v(np) <- v findFirstIn fullVersion
+    } yield np
+    versionNoPatch.getOrElse(fullVersion)
+  }
+
+  def jarFilesForScala: Array[File] = {
+    def recursiveListFiles(f: File): Array[File] = {
+      Option(f.listFiles) map { these =>
+        these ++ these.filter(_.isDirectory).flatMap(recursiveListFiles)
+      } getOrElse Array[File]()
+    }
+    def jarFilesForScalaFilter(jars: Array[File]): Array[File] =
+      jars.filterNot { f =>
+        f.getName.toLowerCase.endsWith("-javadoc.jar") ||
+        f.getName.toLowerCase.endsWith("-sources.jar")
+      }
+    def jarFilesAt(path: File): Array[File] = recursiveListFiles(path) filter {
+      _.getName.toLowerCase.endsWith(".jar")
+    }
+    val libFiles = jarFilesForScalaFilter(jarFilesAt(new File("lib")))
+    val scalaVersionNoPatch = Common.versionNoPatch(BuildInfo.scalaVersion)
+    val targetFiles = jarFilesForScalaFilter(jarFilesAt(new File("target" +
+      File.separator + s"scala-${scalaVersionNoPatch}")))
+    // Use libFiles is target is empty.
+    if (targetFiles.size > 0) targetFiles else libFiles
+  }
+
+  def coreAssembly(pioHome: String): Expected[File] = {
+    val core = s"pio-assembly-${BuildInfo.version}.jar"
+    val coreDir =
+      if (new File(pioHome + File.separator + "RELEASE").exists) {
+        new File(pioHome + File.separator + "lib")
+      } else {
+        new File(pioHome + File.separator + "assembly")
+      }
+    val coreFile = new File(coreDir, core)
+    if (coreFile.exists) {
+      Right(coreFile)
+    } else {
+      logAndFail(s"PredictionIO Core Assembly (${coreFile.getCanonicalPath}) 
does " +
+        "not exist. Aborting.")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala 
b/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala
index 189e5a8..334981c 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala
@@ -24,6 +24,7 @@ import grizzled.slf4j.Logging
 import org.apache.predictionio.data.storage.EngineManifest
 import org.apache.predictionio.data.storage.EngineManifestSerializer
 import org.apache.predictionio.data.storage.Storage
+import org.apache.predictionio.tools.ReturnTypes._
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.fs.Path
@@ -32,35 +33,34 @@ import org.json4s.native.Serialization.read
 
 import scala.io.Source
 
-object RegisterEngine extends Logging {
+object RegisterEngine extends EitherLogging {
   val engineManifests = Storage.getMetaDataEngineManifests
   implicit val formats = DefaultFormats + new EngineManifestSerializer
 
   def registerEngine(
       jsonManifest: File,
       engineFiles: Seq[File],
-      copyLocal: Boolean = false): Unit = {
+      copyLocal: Boolean = false): MaybeError = {
     val jsonString = try {
       Source.fromFile(jsonManifest).mkString
     } catch {
       case e: java.io.FileNotFoundException =>
-        error(s"Engine manifest file not found: ${e.getMessage}. Aborting.")
-        sys.exit(1)
+        return logAndFail(s"Engine manifest file not found: ${e.getMessage}. 
Aborting.")
     }
     val engineManifest = read[EngineManifest](jsonString)
 
     info(s"Registering engine ${engineManifest.id} ${engineManifest.version}")
     engineManifests.update(
       engineManifest.copy(files = engineFiles.map(_.toURI.toString)), true)
+    Success
   }
 
-  def unregisterEngine(jsonManifest: File): Unit = {
+  def unregisterEngine(jsonManifest: File): MaybeError = {
     val jsonString = try {
       Source.fromFile(jsonManifest).mkString
     } catch {
       case e: java.io.FileNotFoundException =>
-        error(s"Engine manifest file not found: ${e.getMessage}. Aborting.")
-        sys.exit(1)
+        return logAndFail(s"Engine manifest file not found: ${e.getMessage}. 
Aborting.")
     }
     val fileEngineManifest = read[EngineManifest](jsonString)
     val engineManifest = engineManifests.get(
@@ -78,9 +78,9 @@ object RegisterEngine extends Logging {
       }
 
       engineManifests.delete(em.id, em.version)
-      info(s"Unregistered engine ${em.id} ${em.version}")
+      logAndSucceed(s"Unregistered engine ${em.id} ${em.version}")
     } getOrElse {
-      error(s"${fileEngineManifest.id} ${fileEngineManifest.version} is not " +
+      logAndFail(s"${fileEngineManifest.id} ${fileEngineManifest.version} is 
not " +
         "registered.")
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala 
b/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala
index d091654..1431432 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala
@@ -23,159 +23,75 @@ import java.net.URI
 
 import grizzled.slf4j.Logging
 import org.apache.predictionio.data.storage.EngineManifest
+import org.apache.predictionio.tools.ReturnTypes._
 import org.apache.predictionio.tools.console.ConsoleArgs
 import org.apache.predictionio.workflow.WorkflowUtils
+import org.apache.predictionio.workflow.JsonExtractorOption
+import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
 
 import scala.sys.process._
 
-object RunServer extends Logging {
-  def runServer(
-      ca: ConsoleArgs,
-      core: File,
-      em: EngineManifest,
-      engineInstanceId: String): Int = {
-    val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_")).map(kv =>
-      s"${kv._1}=${kv._2}"
-    ).mkString(",")
-
-    val sparkHome = ca.common.sparkHome.getOrElse(
-      sys.env.getOrElse("SPARK_HOME", "."))
-
-    val extraFiles = WorkflowUtils.thirdPartyConfFiles
-
-    val driverClassPathIndex =
-      ca.common.sparkPassThrough.indexOf("--driver-class-path")
-    val driverClassPathPrefix =
-      if (driverClassPathIndex != -1) {
-        Seq(ca.common.sparkPassThrough(driverClassPathIndex + 1))
-      } else {
-        Seq()
-      }
-    val extraClasspaths =
-      driverClassPathPrefix ++ WorkflowUtils.thirdPartyClasspaths
+case class DeployArgs(
+  ip: String = "0.0.0.0",
+  port: Int = 8000,
+  logUrl: Option[String] = None,
+  logPrefix: Option[String] = None)
 
-    val deployModeIndex =
-      ca.common.sparkPassThrough.indexOf("--deploy-mode")
-    val deployMode = if (deployModeIndex != -1) {
-      ca.common.sparkPassThrough(deployModeIndex + 1)
-    } else {
-      "client"
-    }
+case class EventServerArgs(
+  enabled: Boolean = false,
+  ip: String = "0.0.0.0",
+  port: Int = 7070,
+  stats: Boolean = false)
 
-    val mainJar =
-      if (ca.build.uberJar) {
-        if (deployMode == "cluster") {
-          em.files.filter(_.startsWith("hdfs")).head
-        } else {
-          em.files.filterNot(_.startsWith("hdfs")).head
-        }
-      } else {
-        if (deployMode == "cluster") {
-          em.files.filter(_.contains("pio-assembly")).head
-        } else {
-          core.getCanonicalPath
-        }
-      }
-
-    val jarFiles = (em.files ++ Option(new File(ca.common.pioHome.get, 
"plugins")
-      
.listFiles()).getOrElse(Array.empty[File]).map(_.getAbsolutePath)).mkString(",")
-
-    val sparkSubmit =
-      Seq(Seq(sparkHome, "bin", "spark-submit").mkString(File.separator)) ++
-      ca.common.sparkPassThrough ++
-      Seq(
-        "--class",
-        "org.apache.predictionio.workflow.CreateServer",
-        "--name",
-        s"PredictionIO Engine Instance: ${engineInstanceId}") ++
-      (if (!ca.build.uberJar) {
-        Seq("--jars", jarFiles)
-      } else Seq()) ++
-      (if (extraFiles.size > 0) {
-        Seq("--files", extraFiles.mkString(","))
-      } else {
-        Seq()
-      }) ++
-      (if (extraClasspaths.size > 0) {
-        Seq("--driver-class-path", extraClasspaths.mkString(":"))
-      } else {
-        Seq()
-      }) ++
-      (if (ca.common.sparkKryo) {
-        Seq(
-          "--conf",
-          "spark.serializer=org.apache.spark.serializer.KryoSerializer")
-      } else {
-        Seq()
-      }) ++
-      Seq(
-        mainJar,
-        "--engineInstanceId",
-        engineInstanceId,
-        "--ip",
-        ca.deploy.ip,
-        "--port",
-        ca.deploy.port.toString,
-        "--event-server-ip",
-        ca.eventServer.ip,
-        "--event-server-port",
-        ca.eventServer.port.toString) ++
-      (if (ca.accessKey.accessKey != "") {
-        Seq("--accesskey", ca.accessKey.accessKey)
-      } else {
-        Seq()
-      }) ++
-      (if (ca.eventServer.enabled) Seq("--feedback") else Seq()) ++
-      (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) 
++
-      (if (ca.common.verbose) Seq("--verbose") else Seq()) ++
-      ca.deploy.logUrl.map(x => Seq("--log-url", x)).getOrElse(Seq()) ++
-      ca.deploy.logPrefix.map(x => Seq("--log-prefix", x)).getOrElse(Seq()) ++
-      Seq("--json-extractor", ca.common.jsonExtractor.toString)
+case class ServerArgs(
+  deploy: DeployArgs = DeployArgs(),
+  eventServer: EventServerArgs = EventServerArgs(),
+  batch: String = "",
+  accessKey: String = "",
+  variantJson: File = new File("engine.json"),
+  jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both)
 
-    info(s"Submission command: ${sparkSubmit.mkString(" ")}")
 
-    val proc =
-      Process(sparkSubmit, None, "CLASSPATH" -> "", "SPARK_YARN_USER_ENV" -> 
pioEnvVars).run()
-    Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
-      def run(): Unit = {
-        proc.destroy()
-      }
-    }))
-    proc.exitValue()
-  }
+object RunServer extends Logging {
 
-  def newRunServer(
-    ca: ConsoleArgs,
+  def runServer(
+    engineInstanceId: String,
+    serverArgs: ServerArgs,
+    sparkArgs: SparkArgs,
     em: EngineManifest,
-    engineInstanceId: String): Int = {
+    pioHome: String,
+    verbose: Boolean = false): Expected[(Process, () => Unit)] = {
+
     val jarFiles = em.files.map(new URI(_)) ++
-      Option(new File(ca.common.pioHome.get, "plugins").listFiles())
+      Option(new File(pioHome, "plugins").listFiles())
         .getOrElse(Array.empty[File]).map(_.toURI)
     val args = Seq(
       "--engineInstanceId",
       engineInstanceId,
       "--engine-variant",
-      ca.common.variantJson.toURI.toString,
+      serverArgs.variantJson.toURI.toString,
       "--ip",
-      ca.deploy.ip,
+      serverArgs.deploy.ip,
       "--port",
-      ca.deploy.port.toString,
+      serverArgs.deploy.port.toString,
       "--event-server-ip",
-      ca.eventServer.ip,
+      serverArgs.eventServer.ip,
       "--event-server-port",
-      ca.eventServer.port.toString) ++
-      (if (ca.accessKey.accessKey != "") {
-        Seq("--accesskey", ca.accessKey.accessKey)
+      serverArgs.eventServer.port.toString) ++
+      (if (serverArgs.accessKey != "") {
+        Seq("--accesskey", serverArgs.accessKey)
       } else {
         Nil
       }) ++
-      (if (ca.eventServer.enabled) Seq("--feedback") else Nil) ++
-      (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Nil) ++
-      (if (ca.common.verbose) Seq("--verbose") else Nil) ++
-      ca.deploy.logUrl.map(x => Seq("--log-url", x)).getOrElse(Nil) ++
-      ca.deploy.logPrefix.map(x => Seq("--log-prefix", x)).getOrElse(Nil) ++
-      Seq("--json-extractor", ca.common.jsonExtractor.toString)
-
-    Runner.runOnSpark("org.apache.predictionio.workflow.CreateServer", args, 
ca, jarFiles)
+      (if (serverArgs.eventServer.enabled) Seq("--feedback") else Nil) ++
+      (if (serverArgs.batch != "") Seq("--batch", serverArgs.batch) else Nil) 
++
+      (if (verbose) Seq("--verbose") else Nil) ++
+      serverArgs.deploy.logUrl.map(x => Seq("--log-url", x)).getOrElse(Nil) ++
+      serverArgs.deploy.logPrefix.map(x => Seq("--log-prefix", 
x)).getOrElse(Nil) ++
+      Seq("--json-extractor", serverArgs.jsonExtractor.toString)
+
+    Runner.runOnSpark(
+      "org.apache.predictionio.workflow.CreateServer",
+      args, sparkArgs, jarFiles, pioHome, verbose)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala 
b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
index 097eb83..8b8d769 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala
@@ -24,156 +24,39 @@ import java.net.URI
 import grizzled.slf4j.Logging
 import org.apache.predictionio.data.storage.EngineManifest
 import org.apache.predictionio.tools.console.ConsoleArgs
+import org.apache.predictionio.tools.ReturnTypes._
 import org.apache.predictionio.workflow.WorkflowUtils
+import org.apache.predictionio.workflow.JsonExtractorOption
+import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.fs.Path
 
 import scala.sys.process._
 
-object RunWorkflow extends Logging {
-  def runWorkflow(
-      ca: ConsoleArgs,
-      core: File,
-      em: EngineManifest,
-      variantJson: File): Int = {
-    // Collect and serialize PIO_* environmental variables
-    val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_")).map(kv =>
-      s"${kv._1}=${kv._2}"
-    ).mkString(",")
-
-    val sparkHome = ca.common.sparkHome.getOrElse(
-      sys.env.getOrElse("SPARK_HOME", "."))
-
-    val hadoopConf = new Configuration
-    val hdfs = FileSystem.get(hadoopConf)
-
-    val driverClassPathIndex =
-      ca.common.sparkPassThrough.indexOf("--driver-class-path")
-    val driverClassPathPrefix =
-      if (driverClassPathIndex != -1) {
-        Seq(ca.common.sparkPassThrough(driverClassPathIndex + 1))
-      } else {
-        Seq()
-      }
-    val extraClasspaths =
-      driverClassPathPrefix ++ WorkflowUtils.thirdPartyClasspaths
-
-    val deployModeIndex =
-      ca.common.sparkPassThrough.indexOf("--deploy-mode")
-    val deployMode = if (deployModeIndex != -1) {
-      ca.common.sparkPassThrough(deployModeIndex + 1)
-    } else {
-      "client"
-    }
-
-    val extraFiles = WorkflowUtils.thirdPartyConfFiles
+case class WorkflowArgs(
+  batch: String = "",
+  variantJson: File = new File("engine.json"),
+  verbosity: Int = 0,
+  engineParamsKey: Option[String] = None,
+  engineFactory: Option[String] = None,
+  evaluation: Option[String] = None,
+  engineParamsGenerator: Option[String] = None,
+  stopAfterRead: Boolean = false,
+  stopAfterPrepare: Boolean = false,
+  skipSanityCheck: Boolean = false,
+  jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both)
 
-    val mainJar =
-      if (ca.build.uberJar) {
-        if (deployMode == "cluster") {
-          em.files.filter(_.startsWith("hdfs")).head
-        } else {
-          em.files.filterNot(_.startsWith("hdfs")).head
-        }
-      } else {
-        if (deployMode == "cluster") {
-          em.files.filter(_.contains("pio-assembly")).head
-        } else {
-          core.getCanonicalPath
-        }
-      }
-
-    val workMode =
-      ca.common.evaluation.map(_ => "Evaluation").getOrElse("Training")
-
-    val engineLocation = Seq(
-      sys.env("PIO_FS_ENGINESDIR"),
-      em.id,
-      em.version)
-
-    if (deployMode == "cluster") {
-      val dstPath = new Path(engineLocation.mkString(Path.SEPARATOR))
-      info("Cluster deploy mode detected. Trying to copy " +
-        s"${variantJson.getCanonicalPath} to " +
-        s"${hdfs.makeQualified(dstPath).toString}.")
-      hdfs.copyFromLocalFile(new Path(variantJson.toURI), dstPath)
-    }
-
-    val sparkSubmit =
-      Seq(Seq(sparkHome, "bin", "spark-submit").mkString(File.separator)) ++
-      ca.common.sparkPassThrough ++
-      Seq(
-        "--class",
-        "org.apache.predictionio.workflow.CreateWorkflow",
-        "--name",
-        s"PredictionIO $workMode: ${em.id} ${em.version} 
(${ca.common.batch})") ++
-      (if (!ca.build.uberJar) {
-        Seq("--jars", em.files.mkString(","))
-      } else Seq()) ++
-      (if (extraFiles.size > 0) {
-        Seq("--files", extraFiles.mkString(","))
-      } else {
-        Seq()
-      }) ++
-      (if (extraClasspaths.size > 0) {
-        Seq("--driver-class-path", extraClasspaths.mkString(":"))
-      } else {
-        Seq()
-      }) ++
-      (if (ca.common.sparkKryo) {
-        Seq(
-          "--conf",
-          "spark.serializer=org.apache.spark.serializer.KryoSerializer")
-      } else {
-        Seq()
-      }) ++
-      Seq(
-        mainJar,
-        "--env",
-        pioEnvVars,
-        "--engine-id",
-        em.id,
-        "--engine-version",
-        em.version,
-        "--engine-variant",
-        if (deployMode == "cluster") {
-          hdfs.makeQualified(new Path(
-            (engineLocation :+ variantJson.getName).mkString(Path.SEPARATOR))).
-            toString
-        } else {
-          variantJson.getCanonicalPath
-        },
-        "--verbosity",
-        ca.common.verbosity.toString) ++
-      ca.common.engineFactory.map(
-        x => Seq("--engine-factory", x)).getOrElse(Seq()) ++
-      ca.common.engineParamsKey.map(
-        x => Seq("--engine-params-key", x)).getOrElse(Seq()) ++
-      (if (deployMode == "cluster") Seq("--deploy-mode", "cluster") else 
Seq()) ++
-      (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) 
++
-      (if (ca.common.verbose) Seq("--verbose") else Seq()) ++
-      (if (ca.common.skipSanityCheck) Seq("--skip-sanity-check") else Seq()) ++
-      (if (ca.common.stopAfterRead) Seq("--stop-after-read") else Seq()) ++
-      (if (ca.common.stopAfterPrepare) {
-        Seq("--stop-after-prepare")
-      } else {
-        Seq()
-      }) ++
-      ca.common.evaluation.map(x => Seq("--evaluation-class", x)).
-        getOrElse(Seq()) ++
-      // If engineParamsGenerator is specified, it overrides the evaluation.
-      ca.common.engineParamsGenerator.orElse(ca.common.evaluation)
-        .map(x => Seq("--engine-params-generator-class", x))
-        .getOrElse(Seq()) ++
-      (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) 
++
-      Seq("--json-extractor", ca.common.jsonExtractor.toString)
+object RunWorkflow extends Logging {
 
-    info(s"Submission command: ${sparkSubmit.mkString(" ")}")
-    Process(sparkSubmit, None, "CLASSPATH" -> "", "SPARK_YARN_USER_ENV" -> 
pioEnvVars).!
-  }
+  def runWorkflow(
+    wa: WorkflowArgs,
+    sa: SparkArgs,
+    em: EngineManifest,
+    pioHome: String,
+    verbose: Boolean = false): Expected[(Process, () => Unit)] = {
 
-  def newRunWorkflow(ca: ConsoleArgs, em: EngineManifest): Int = {
     val jarFiles = em.files.map(new URI(_))
     val args = Seq(
       "--engine-id",
@@ -181,35 +64,37 @@ object RunWorkflow extends Logging {
       "--engine-version",
       em.version,
       "--engine-variant",
-      ca.common.variantJson.toURI.toString,
+      wa.variantJson.toURI.toString,
       "--verbosity",
-      ca.common.verbosity.toString) ++
-      ca.common.engineFactory.map(
+      wa.verbosity.toString) ++
+      wa.engineFactory.map(
         x => Seq("--engine-factory", x)).getOrElse(Seq()) ++
-      ca.common.engineParamsKey.map(
+      wa.engineParamsKey.map(
         x => Seq("--engine-params-key", x)).getOrElse(Seq()) ++
-      (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) 
++
-      (if (ca.common.verbose) Seq("--verbose") else Seq()) ++
-      (if (ca.common.skipSanityCheck) Seq("--skip-sanity-check") else Seq()) ++
-      (if (ca.common.stopAfterRead) Seq("--stop-after-read") else Seq()) ++
-      (if (ca.common.stopAfterPrepare) {
+      (if (wa.batch != "") Seq("--batch", wa.batch) else Seq()) ++
+      (if (verbose) Seq("--verbose") else Seq()) ++
+      (if (wa.skipSanityCheck) Seq("--skip-sanity-check") else Seq()) ++
+      (if (wa.stopAfterRead) Seq("--stop-after-read") else Seq()) ++
+      (if (wa.stopAfterPrepare) {
         Seq("--stop-after-prepare")
       } else {
         Seq()
       }) ++
-      ca.common.evaluation.map(x => Seq("--evaluation-class", x)).
+      wa.evaluation.map(x => Seq("--evaluation-class", x)).
         getOrElse(Seq()) ++
       // If engineParamsGenerator is specified, it overrides the evaluation.
-      ca.common.engineParamsGenerator.orElse(ca.common.evaluation)
+      wa.engineParamsGenerator.orElse(wa.evaluation)
         .map(x => Seq("--engine-params-generator-class", x))
         .getOrElse(Seq()) ++
-      (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) 
++
-      Seq("--json-extractor", ca.common.jsonExtractor.toString)
+      (if (wa.batch != "") Seq("--batch", wa.batch) else Seq()) ++
+      Seq("--json-extractor", wa.jsonExtractor.toString)
 
     Runner.runOnSpark(
       "org.apache.predictionio.workflow.CreateWorkflow",
       args,
-      ca,
-      jarFiles)
+      sa,
+      jarFiles,
+      pioHome,
+      verbose)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala
----------------------------------------------------------------------
diff --git a/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala 
b/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala
index b3ec51c..d9752df 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala
@@ -21,16 +21,22 @@ package org.apache.predictionio.tools
 import java.io.File
 import java.net.URI
 
-import grizzled.slf4j.Logging
 import org.apache.predictionio.tools.console.ConsoleArgs
 import org.apache.predictionio.workflow.WorkflowUtils
+import org.apache.predictionio.tools.ReturnTypes._
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.fs.Path
 
 import scala.sys.process._
 
-object Runner extends Logging {
+case class SparkArgs(
+  sparkHome: Option[String] = None,
+  sparkPassThrough: Seq[String] = Seq(),
+  sparkKryo: Boolean = false,
+  scratchUri: Option[URI] = None)
+
+object Runner extends EitherLogging {
   def envStringToMap(env: String): Map[String, String] =
     env.split(',').flatMap(p =>
       p.split('=') match {
@@ -95,26 +101,27 @@ object Runner extends Logging {
   def runOnSpark(
       className: String,
       classArgs: Seq[String],
-      ca: ConsoleArgs,
-      extraJars: Seq[URI]): Int = {
+      sa: SparkArgs,
+      extraJars: Seq[URI],
+      pioHome: String,
+      verbose: Boolean = false): Expected[(Process, () => Unit)] = {
     // Return error for unsupported cases
     val deployMode =
-      argumentValue(ca.common.sparkPassThrough, 
"--deploy-mode").getOrElse("client")
+      argumentValue(sa.sparkPassThrough, "--deploy-mode").getOrElse("client")
     val master =
-      argumentValue(ca.common.sparkPassThrough, "--master").getOrElse("local")
+      argumentValue(sa.sparkPassThrough, "--master").getOrElse("local")
 
-    (ca.common.scratchUri, deployMode, master) match {
+    (sa.scratchUri, deployMode, master) match {
       case (Some(u), "client", m) if m != "yarn-cluster" =>
-        error("--scratch-uri cannot be set when deploy mode is client")
-        return 1
+        return logAndFail("--scratch-uri cannot be set when deploy mode is 
client")
       case (_, "cluster", m) if m.startsWith("spark://") =>
-        error("Using cluster deploy mode with Spark standalone cluster is not 
supported")
-        return 1
+        return logAndFail(
+          "Using cluster deploy mode with Spark standalone cluster is not 
supported")
       case _ => Unit
     }
 
     // Initialize HDFS API for scratch URI
-    val fs = ca.common.scratchUri map { uri =>
+    val fs = sa.scratchUri map { uri =>
       FileSystem.get(uri, new Configuration())
     }
 
@@ -124,18 +131,18 @@ object Runner extends Logging {
     ).mkString(",")
 
     // Location of Spark
-    val sparkHome = ca.common.sparkHome.getOrElse(
+    val sparkHome = sa.sparkHome.getOrElse(
       sys.env.getOrElse("SPARK_HOME", "."))
 
     // Local path to PredictionIO assembly JAR
-    val mainJar = handleScratchFile(
-      fs,
-      ca.common.scratchUri,
-      console.Console.coreAssembly(ca.common.pioHome.get))
+    val mainJar = Common.coreAssembly(pioHome) fold(
+        errStr => return Left(errStr),
+        assembly => handleScratchFile(fs, sa.scratchUri, assembly)
+      )
 
     // Extra JARs that are needed by the driver
     val driverClassPathPrefix =
-      argumentValue(ca.common.sparkPassThrough, "--driver-class-path") map { v 
=>
+      argumentValue(sa.sparkPassThrough, "--driver-class-path") map { v =>
         Seq(v)
       } getOrElse {
         Nil
@@ -146,11 +153,11 @@ object Runner extends Logging {
 
     // Extra files that are needed to be passed to --files
     val extraFiles = WorkflowUtils.thirdPartyConfFiles map { f =>
-      handleScratchFile(fs, ca.common.scratchUri, new File(f))
+      handleScratchFile(fs, sa.scratchUri, new File(f))
     }
 
     val deployedJars = extraJars map { j =>
-      handleScratchFile(fs, ca.common.scratchUri, new File(j))
+      handleScratchFile(fs, sa.scratchUri, new File(j))
     }
 
     val sparkSubmitCommand =
@@ -174,7 +181,7 @@ object Runner extends Logging {
       Nil
     }
 
-    val sparkSubmitKryo = if (ca.common.sparkKryo) {
+    val sparkSubmitKryo = if (sa.sparkKryo) {
       Seq(
         "--conf",
         "spark.serializer=org.apache.spark.serializer.KryoSerializer")
@@ -182,33 +189,26 @@ object Runner extends Logging {
       Nil
     }
 
-    val verbose = if (ca.common.verbose) Seq("--verbose") else Nil
+    val verboseArg = if (verbose) Seq("--verbose") else Nil
 
     val sparkSubmit = Seq(
       sparkSubmitCommand,
-      ca.common.sparkPassThrough,
+      sa.sparkPassThrough,
       Seq("--class", className),
       sparkSubmitJars,
       sparkSubmitFiles,
       sparkSubmitExtraClasspaths,
       sparkSubmitKryo,
       Seq(mainJar),
-      detectFilePaths(fs, ca.common.scratchUri, classArgs),
+      detectFilePaths(fs, sa.scratchUri, classArgs),
       Seq("--env", pioEnvVars),
-      verbose).flatten.filter(_ != "")
+      verboseArg).flatten.filter(_ != "")
     info(s"Submission command: ${sparkSubmit.mkString(" ")}")
     val proc = Process(
       sparkSubmit,
       None,
       "CLASSPATH" -> "",
       "SPARK_YARN_USER_ENV" -> pioEnvVars).run()
-    Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
-      def run(): Unit = {
-        cleanup(fs, ca.common.scratchUri)
-        proc.destroy()
-      }
-    }))
-    cleanup(fs, ca.common.scratchUri)
-    proc.exitValue()
+    Right((proc, () => cleanup(fs, sa.scratchUri)))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala 
b/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala
index f53d84c..bbe39a5 100644
--- a/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala
+++ b/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala
@@ -133,7 +133,7 @@ case class AdminServerConfig(
 )
 
 object AdminServer {
-  def createAdminServer(config: AdminServerConfig): Unit = {
+  def createAdminServer(config: AdminServerConfig): ActorSystem = {
     implicit val system = ActorSystem("AdminServerSystem")
 
     val commandClient = new CommandClient(
@@ -146,7 +146,7 @@ object AdminServer {
       Props(classOf[AdminServerActor], commandClient),
       "AdminServerActor")
     serverActor ! StartServer(config.ip, config.port)
-    system.awaitTermination
+    system
   }
 }
 
@@ -155,5 +155,6 @@ object AdminRun {
     AdminServer.createAdminServer(AdminServerConfig(
       ip = "localhost",
       port = 7071))
+    .awaitTermination
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/commands/AccessKey.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/predictionio/tools/commands/AccessKey.scala 
b/tools/src/main/scala/org/apache/predictionio/tools/commands/AccessKey.scala
new file mode 100644
index 0000000..715cf8f
--- /dev/null
+++ 
b/tools/src/main/scala/org/apache/predictionio/tools/commands/AccessKey.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools.commands
+
+import org.apache.predictionio.data.storage
+import org.apache.predictionio.tools
+import org.apache.predictionio.tools.EitherLogging
+import org.apache.predictionio.tools.ReturnTypes._
+
+import grizzled.slf4j.Logging
+import scala.util.Either
+
+object AccessKey extends EitherLogging {
+
+  def create(
+    appName: String,
+    key: String,
+    events: Seq[String]): Expected[storage.AccessKey] = {
+
+    val apps = storage.Storage.getMetaDataApps
+    apps.getByName(appName) map { app =>
+      val accessKeys = storage.Storage.getMetaDataAccessKeys
+      val newKey = storage.AccessKey(
+        key = key,
+        appid = app.id,
+        events = events)
+      accessKeys.insert(newKey) map { k =>
+        info(s"Created new access key: ${k}")
+        Right(newKey.copy(key = k))
+      } getOrElse {
+        logAndFail(s"Unable to create new access key.")
+      }
+    } getOrElse {
+      logAndFail(s"App ${appName} does not exist. Aborting.")
+    }
+  }
+
+  def list(app: Option[String]): Expected[Seq[storage.AccessKey]] =
+    app map { appName =>
+      App.show(appName).right map { appChansPair => appChansPair._1.keys }
+    } getOrElse {
+      Right(storage.Storage.getMetaDataAccessKeys.getAll)
+    }
+
+  def delete(key: String): MaybeError = {
+    try {
+      storage.Storage.getMetaDataAccessKeys.delete(key)
+      logAndSucceed(s"Deleted access key ${key}.")
+    } catch {
+      case e: Exception =>
+        error(s"Error deleting access key ${key}.", e)
+        Left(s"Error deleting access key ${key}.")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/commands/App.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/predictionio/tools/commands/App.scala 
b/tools/src/main/scala/org/apache/predictionio/tools/commands/App.scala
new file mode 100644
index 0000000..44fa667
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/App.scala
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools.commands
+
+import org.apache.predictionio.data.storage
+import org.apache.predictionio.data.storage.Channel
+import org.apache.predictionio.tools.EitherLogging
+import org.apache.predictionio.tools.ReturnTypes._
+
+sealed case class AppDescription(
+  app: storage.App,
+  keys: Seq[storage.AccessKey])
+
+object App extends EitherLogging {
+
+  def create(
+    name: String,
+    id: Option[Int] = None,
+    description: Option[String] = None,
+    accessKey: String = "") : Expected[AppDescription] = {
+
+    val apps = storage.Storage.getMetaDataApps()
+    // get the client in the beginning so error exit right away if can't 
access client
+    val events = storage.Storage.getLEvents()
+    var errStr = ""
+
+    apps.getByName(name) map { app =>
+      errStr = s"App ${name} already exists. Aborting."
+      error(errStr)
+      errStr
+    } orElse {
+      id.flatMap { id =>
+        apps.get(id) map { app =>
+          errStr = s"App ID ${id} already exists and maps to the app 
'${app.name}'. " +
+            "Aborting."
+          error(errStr)
+          errStr
+        }
+      }
+    } map {err => Left(err)} getOrElse {
+      val newApp = storage.App(
+        id = id.getOrElse(0),
+        name = name,
+        description = description)
+      val appid = apps.insert(newApp)
+
+      appid map { id =>
+        val dbInit = events.init(id)
+        val r = if (dbInit) {
+          info(s"Initialized Event Store for this app ID: ${id}.")
+          val accessKeys = storage.Storage.getMetaDataAccessKeys
+          val newKey = storage.AccessKey(
+            key = accessKey,
+            appid = id,
+            events = Seq())
+          accessKeys.insert(newKey)
+          .map { k =>
+            Right(AppDescription(
+              app = newApp.copy(id = id),
+              keys = Seq(newKey.copy(key = k))))
+          } getOrElse {
+            logAndFail(s"Unable to create new access key.")
+          }
+        } else {
+          errStr = s"Unable to initialize Event Store for this app ID: ${id}."
+          try {
+            apps.delete(id)
+          } catch {
+            case e: Exception =>
+              errStr += s"""
+                |Failed to revert back the App meta-data change.
+                |The app ${name} CANNOT be used!
+                |Please run 'pio app delete ${name}' to delete this app!"""
+          }
+          logAndFail(errStr)
+        }
+        events.close()
+        r
+      } getOrElse {
+        logAndFail(s"Unable to create new app.")
+      }
+    }
+  }
+
+  def list: Seq[AppDescription] = {
+    val apps = storage.Storage.getMetaDataApps.getAll().sortBy(_.name)
+    val accessKeys = storage.Storage.getMetaDataAccessKeys
+
+    apps map { app =>
+      AppDescription(
+        app = app,
+        keys = accessKeys.getByAppid(app.id))
+    }
+  }
+
+  def show(appName: String): Expected[(AppDescription, Seq[Channel])] = {
+    val apps = storage.Storage.getMetaDataApps
+    val accessKeys = storage.Storage.getMetaDataAccessKeys
+    val channels = storage.Storage.getMetaDataChannels
+
+    apps.getByName(appName) map { app =>
+      Right(
+        (AppDescription(
+          app = app,
+          keys = accessKeys.getByAppid(app.id)),
+        channels.getByAppid(app.id))
+      )
+    } getOrElse {
+      logAndFail(s"App ${appName} does not exist. Aborting.")
+    }
+  }
+
+  def delete(name: String): MaybeError = {
+    val events = storage.Storage.getLEvents()
+    try {
+      show(name).right.flatMap { case (appDesc: AppDescription, channels: 
Seq[Channel]) =>
+
+        val delChannelStatus: MaybeError =
+          channels.map { ch =>
+            if (events.remove(appDesc.app.id, Some(ch.id))) {
+              info(s"Removed Event Store of the channel ID: ${ch.id}")
+              try {
+                storage.Storage.getMetaDataChannels.delete(ch.id)
+                info(s"Deleted channel ${ch.name}")
+                None
+              } catch {
+                case e: Exception =>
+                  val errStr = s"Error deleting channel ${ch.name}."
+                  error(errStr, e)
+                  Some(errStr)
+              }
+              } else {
+                val errStr = s"Error removing Event Store of the channel ID: 
${ch.id}."
+                error(errStr)
+                Some(errStr)
+              }
+          }
+          .flatten
+          .reduceOption(_ + "\n" + _)
+          .map(Left(_)) getOrElse Success
+
+          if (delChannelStatus.isLeft) {
+            return delChannelStatus
+          }
+
+          try {
+            events.remove(appDesc.app.id)
+            info(s"Removed Event Store for this app ID: ${appDesc.app.id}")
+          } catch {
+            case e: Exception =>
+              logAndFail(s"Error removing Event Store for this app. Aborting.")
+          }
+
+          appDesc.keys foreach { key =>
+            try {
+              storage.Storage.getMetaDataAccessKeys.delete(key.key)
+              info(s"Removed access key ${key.key}")
+            } catch {
+              case e: Exception =>
+                logAndFail(s"Error removing access key ${key.key}. Aborting.")
+            }
+          }
+
+          try {
+            storage.Storage.getMetaDataApps.delete(appDesc.app.id)
+            info(s"Deleted app ${appDesc.app.name}.")
+          } catch {
+            case e: Exception =>
+              logAndFail(s"Error deleting app ${appDesc.app.name}. Aborting.")
+          }
+          logAndSucceed("Done.")
+      }
+
+    } finally {
+      events.close()
+    }
+  }
+
+  def dataDelete(
+    name: String,
+    channel: Option[String] = None,
+    all: Boolean = false): MaybeError = {
+
+    var errStr = ""
+    val events = storage.Storage.getLEvents()
+    try {
+      show(name).right.flatMap { case (appDesc: AppDescription, channels: 
Seq[Channel]) =>
+
+        val chanIdsToRemove: Seq[Option[Int]] =
+          if (all) {
+            channels.map(ch => Some(ch.id)) :+ None // remove default channel 
too
+          } else {
+            channel.map { chName =>
+              channels.find(ch => ch.name == chName) match {
+                case None =>
+                  return logAndFail(s"""Unable to delete data for channel.
+                              |Channel ${chName} doesn't exist.""")
+                case Some(ch) => Seq(Some(ch.id))
+              }
+              } getOrElse {
+                Seq(None) // for default channel
+              }
+          }
+
+        chanIdsToRemove.map { chId: Option[Int] =>
+
+          val r1 = if (events.remove(appDesc.app.id, chId)) {
+            if (chId.isDefined) {
+              info(s"Removed Event Store for the channel ID: ${chId.get}")
+            } else {
+              info(s"Removed Event Store for the app ID: ${appDesc.app.id}")
+            }
+            None
+          } else {
+            errStr =
+              if (chId.isDefined) s"Error removing Event Store for the channel 
ID: ${chId.get}."
+              else s"Error removing Event Store for the app ID: 
${appDesc.app.id}."
+            error(errStr)
+            Some(errStr)
+          }
+          // re-create table
+          val dbInit = events.init(appDesc.app.id, chId)
+          val r2 = if (dbInit) {
+            if (chId.isDefined) {
+              info(s"Initialized Event Store for the channel ID: ${chId.get}")
+            } else {
+              info(s"Initialized Event Store for the app ID: 
${appDesc.app.id}")
+            }
+            None
+          } else {
+            errStr =
+              if (chId.isDefined) {
+                s"Unable to initialize Event Store for the channel ID: 
${chId.get}."
+              }
+              else {
+                s"Unable to initialize Event tore for the app ID: 
${appDesc.app.id}."
+              }
+            error(errStr)
+            Some(errStr)
+          }
+          Seq(r1, r2)
+        }
+        .flatten.flatten
+        .reduceOption(_ + "\n" + _)
+        .toLeft(Ok())
+      }
+    } finally {
+      events.close()
+    }
+  }
+
+  def channelNew(appName: String, newChannel: String): Expected[Channel] = {
+    val events = storage.Storage.getLEvents()
+    val chanStorage = storage.Storage.getMetaDataChannels
+    var errStr = ""
+    try {
+      show(appName).right flatMap { case (appDesc: AppDescription, channels: 
Seq[Channel]) =>
+        if (channels.find(ch => ch.name == newChannel).isDefined) {
+          logAndFail(s"""Channel ${newChannel} already exists.
+                      |Unable to create new channel.""")
+        } else if (!storage.Channel.isValidName(newChannel)) {
+          logAndFail(s"""Unable to create new channel.
+                      |The channel name ${newChannel} is invalid.
+                      |${storage.Channel.nameConstraint}""")
+        } else {
+
+          val channel = Channel(
+            id = 0,
+            appid = appDesc.app.id,
+            name = newChannel)
+
+          chanStorage.insert(channel) map { chanId =>
+
+            info(s"Updated Channel meta-data.")
+
+            // initialize storage
+            val dbInit = events.init(appDesc.app.id, Some(chanId))
+            if (dbInit) {
+              info(s"Initialized Event Store for the channel: ${newChannel}.")
+              info(s"Created new channel:")
+              info(s"    Channel Name: ${newChannel}")
+              info(s"      Channel ID: ${chanId}")
+              info(s"          App ID: ${appDesc.app.id}")
+              Right(channel.copy(id = chanId))
+            } else {
+              errStr = s"""Unable to create new channel.
+                          |Failed to initalize Event Store."""
+              error(errStr)
+              // reverted back the meta data
+              try {
+                chanStorage.delete(chanId)
+                Left(errStr)
+              } catch {
+                case e: Exception =>
+                  val nextErrStr = s"""
+                    |Failed to revert back the Channel meta-data change.
+                    |The channel ${newChannel} CANNOT be used!
+                    |Please run 'pio app channel-delete ${appName} 
${newChannel}'""" +
+                    " to delete this channel!"
+                  logAndFail(errStr + nextErrStr)
+              }
+            }
+          } getOrElse {
+            logAndFail(s"""Unable to create new channel.
+                          |Failed to update Channel meta-data.""")
+          }
+        }
+      }
+    } finally {
+      events.close()
+    }
+  }
+
+  def channelDelete(appName: String, deleteChannel: String): MaybeError = {
+    val chanStorage = storage.Storage.getMetaDataChannels
+    val events = storage.Storage.getLEvents()
+    var errStr = ""
+    try {
+      show(appName).right.flatMap { case (appDesc: AppDescription, channels: 
Seq[Channel]) =>
+        val foundChannel = channels.find(ch => ch.name == deleteChannel)
+        if (foundChannel.isEmpty) {
+          logAndFail(s"""Unable to delete channel
+                        |Channel ${deleteChannel} doesn't exists.""")
+        } else {
+          val chId = foundChannel.get.id
+          val dbRemoved = events.remove(appDesc.app.id, Some(chId))
+          if (dbRemoved) {
+            info(s"Removed Event Store for this channel: ${deleteChannel}")
+            try {
+              chanStorage.delete(chId)
+              logAndSucceed(s"Deleted channel: ${deleteChannel}.")
+            } catch {
+              case e: Exception =>
+                logAndFail(s"""Unable to delete channel.
+                  |Failed to update Channel meta-data.
+                  |The channel ${deleteChannel} CANNOT be used!
+                  |Please run 'pio app channel-delete ${appDesc.app.name} 
${deleteChannel}'""" +
+                  " to delete this channel again!")
+            }
+          } else {
+            logAndFail(s"""Unable to delete channel.
+                          |Error removing Event Store for this channel.""")
+          }
+        }
+      }
+    } finally {
+      events.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala 
b/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala
new file mode 100644
index 0000000..6fd8977
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools.commands
+
+import org.apache.predictionio.core.BuildInfo
+import org.apache.predictionio.controller.Utils
+import org.apache.predictionio.data.storage
+import org.apache.predictionio.data.storage.EngineManifest
+import org.apache.predictionio.data.storage.EngineManifestSerializer
+import org.apache.predictionio.tools.RegisterEngine
+import org.apache.predictionio.tools.EitherLogging
+import org.apache.predictionio.tools.{RunWorkflow, RunServer}
+import org.apache.predictionio.tools.{DeployArgs, WorkflowArgs, SparkArgs, 
ServerArgs}
+import org.apache.predictionio.tools.ReturnTypes._
+import org.apache.predictionio.tools.Common._
+import org.apache.predictionio.workflow.WorkflowUtils
+
+import org.apache.commons.io.FileUtils
+import org.json4s.native.Serialization.read
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
+import org.json4s.native.Serialization.write
+import scala.io.Source
+import scala.util.Random
+import scala.collection.JavaConversions._
+import scala.sys.process._
+import scalaj.http.Http
+import java.io.File
+
+case class BuildArgs(
+  sbt: Option[File] = None,
+  sbtExtra: Option[String] = None,
+  sbtAssemblyPackageDependency: Boolean = true,
+  sbtClean: Boolean = false,
+  uberJar: Boolean = false,
+  forceGeneratePIOSbt: Boolean = false)
+
+case class EngineArgs(
+  manifestJson: File = new File("manifest.json"),
+  engineId: Option[String] = None,
+  engineVersion: Option[String] = None)
+
+object Engine extends EitherLogging {
+
+  private val manifestAutogenTag = "pio-autogen-manifest"
+
+  private def readManifestJson(json: File): Expected[EngineManifest] = {
+    implicit val formats = Utils.json4sDefaultFormats +
+      new EngineManifestSerializer
+    try {
+      Right(read[EngineManifest](Source.fromFile(json).mkString))
+    } catch {
+      case e: java.io.FileNotFoundException =>
+        logAndFail(s"${json.getCanonicalPath} does not exist. Aborting.")
+      case e: MappingException =>
+        logAndFail(s"${json.getCanonicalPath} has invalid content: " +
+          e.getMessage)
+    }
+  }
+
+  private def withRegisteredManifest[T](ea: EngineArgs)(
+      op: EngineManifest => Expected[T]): Expected[T] = {
+    val res: Expected[Expected[T]] = for {
+      ej <- readManifestJson(ea.manifestJson).right
+      id <- Right(ea.engineId getOrElse ej.id).right
+      version <- Right(ea.engineVersion getOrElse ej.version).right
+      manifest <- storage.Storage.getMetaDataEngineManifests.get(id, version)
+        .toRight {
+          val errStr =
+            s"""Engine ${id} ${version} cannot be found in the system.")
+                |Possible reasons:
+                |- the engine is not yet built by the 'build' command;
+                |- the meta data store is offline."""
+          error(errStr)
+          errStr
+        }.right
+    } yield {
+      op(manifest)
+    }
+    res.joinRight
+  }
+
+  private def generateManifestJson(json: File): MaybeError = {
+    val cwd = sys.props("user.dir")
+    implicit val formats = Utils.json4sDefaultFormats +
+      new EngineManifestSerializer
+    val rand = Random.alphanumeric.take(32).mkString
+    val ha = java.security.MessageDigest.getInstance("SHA-1").
+      digest(cwd.getBytes).map("%02x".format(_)).mkString
+    val em = EngineManifest(
+      id = rand,
+      version = ha,
+      name = new File(cwd).getName,
+      description = Some(manifestAutogenTag),
+      files = Seq(),
+      engineFactory = "")
+    try {
+      FileUtils.writeStringToFile(json, write(em), "ISO-8859-1")
+      Success
+    } catch {
+      case e: java.io.IOException =>
+        logAndFail(s"Cannot generate ${json} automatically (${e.getMessage}). 
" +
+          "Aborting.")
+    }
+  }
+
+  private def regenerateManifestJson(json: File): MaybeError = {
+    val cwd = sys.props("user.dir")
+    val ha = java.security.MessageDigest.getInstance("SHA-1").
+      digest(cwd.getBytes).map("%02x".format(_)).mkString
+    if (json.exists) {
+      readManifestJson(json).right.flatMap { em =>
+        if (em.description == Some(manifestAutogenTag) && ha != em.version) {
+          warn("This engine project directory contains an auto-generated " +
+            "manifest that has been copied/moved from another location. ")
+          warn("Regenerating the manifest to reflect the updated location. " +
+            "This will dissociate with all previous engine instances.")
+          generateManifestJson(json)
+        } else {
+          logAndSucceed(s"Using existing engine manifest JSON at " +
+            "${json.getCanonicalPath}")
+        }
+      }
+    } else {
+      generateManifestJson(json)
+    }
+  }
+
+  private def detectSbt(sbt: Option[File], pioHome: String): String = {
+    sbt map {
+      _.getCanonicalPath
+    } getOrElse {
+      val f = new File(Seq(pioHome, "sbt", "sbt").mkString(File.separator))
+      if (f.exists) f.getCanonicalPath else "sbt"
+    }
+  }
+
+  private def outputSbtError(line: String): Unit = {
+    """\[.*error.*\]""".r findFirstIn line foreach { _ => error(line) }
+  }
+
+  private def compile(
+    buildArgs: BuildArgs, pioHome: String, verbose: Boolean): MaybeError = {
+    // only add pioVersion to sbt if project/pio.sbt exists
+    if (new File("project", "pio-build.sbt").exists || 
buildArgs.forceGeneratePIOSbt) {
+      FileUtils.writeLines(
+        new File("pio.sbt"),
+        Seq(
+          "// Generated automatically by pio build.",
+          "// Changes in this file will be overridden.",
+          "",
+          "pioVersion := \"" + BuildInfo.version + "\""))
+    }
+    implicit val formats = Utils.json4sDefaultFormats
+
+    val sbt = detectSbt(buildArgs.sbt, pioHome)
+    info(s"Using command '${sbt}' at the current working directory to build.")
+    info("If the path above is incorrect, this process will fail.")
+    val asm =
+      if (buildArgs.sbtAssemblyPackageDependency) {
+        " assemblyPackageDependency"
+      } else {
+        ""
+      }
+    val clean = if (buildArgs.sbtClean) " clean" else ""
+    val buildCmd = s"${sbt} ${buildArgs.sbtExtra.getOrElse("")}${clean} " +
+      (if (buildArgs.uberJar) "assembly" else s"package${asm}")
+    val core = new File(s"pio-assembly-${BuildInfo.version}.jar")
+    if (buildArgs.uberJar) {
+      info(s"Uber JAR enabled. Putting ${core.getName} in lib.")
+      val dst = new File("lib")
+      dst.mkdir()
+      coreAssembly(pioHome) match {
+        case Right(coreFile) =>
+          FileUtils.copyFileToDirectory(
+            coreFile,
+            dst,
+            true)
+        case Left(errStr) => return Left(errStr)
+      }
+    } else {
+      if (new File("engine.json").exists()) {
+        info(s"Uber JAR disabled. Making sure lib/${core.getName} is absent.")
+        new File("lib", core.getName).delete()
+      } else {
+        info("Uber JAR disabled, but current working directory does not look " 
+
+          s"like an engine project directory. Please delete 
lib/${core.getName} manually.")
+      }
+    }
+    info(s"Going to run: ${buildCmd}")
+    try {
+      val r =
+        if (verbose) {
+          buildCmd.!(ProcessLogger(line => info(line), line => error(line)))
+        } else {
+          buildCmd.!(ProcessLogger(
+            line => outputSbtError(line),
+            line => outputSbtError(line)))
+        }
+      if (r != 0) {
+        logAndFail(s"Return code of build command: ${buildCmd} is ${r}. 
Aborting.")
+      } else {
+        logAndSucceed("Compilation finished successfully.")
+      }
+    } catch {
+      case e: java.io.IOException =>
+        logAndFail(s"Exception during compilation: ${e.getMessage}")
+    }
+  }
+
+  def build(
+    buildArgs: BuildArgs,
+    pioHome: String,
+    manifestJson: File,
+    verbose: Boolean): MaybeError = {
+
+    regenerateManifestJson(manifestJson) match {
+      case Left(err) => return Left(err)
+      case _ => Unit
+    }
+
+    Template.verifyTemplateMinVersion(new File("template.json")) match {
+      case Left(err) => return Left(err)
+      case Right(_) =>
+        compile(buildArgs, pioHome, verbose)
+        info("Looking for an engine...")
+        val jarFiles = jarFilesForScala
+        if (jarFiles.isEmpty) {
+          return logAndFail("No engine found. Your build might have failed. 
Aborting.")
+        }
+        jarFiles foreach { f => info(s"Found ${f.getName}")}
+        RegisterEngine.registerEngine(
+          manifestJson,
+          jarFiles,
+          false)
+    }
+  }
+
+  /** Training an engine.
+    *  The function starts a training process to bu run concurrenlty.
+    *
+    * @param ea An instance of [[EngineArgs]]
+    * @param wa An instance of [[WorkflowArgs]] for running a single training.
+    * @param sa An instance of [[SparkArgs]]
+    * @param pioHome [[String]] with a path to PIO installation
+    * @param verbose A [[Boolean]]
+    * @return An instance of [[Expected]] contaning either [[Left]]
+    *         with an error message or [[Right]] with a handle to a process
+    *         responsible for training and a function () => Unit,
+    *         that must be called when the process is complete
+    */
+  def train(
+    ea: EngineArgs,
+    wa: WorkflowArgs,
+    sa: SparkArgs,
+    pioHome: String,
+    verbose: Boolean = false): Expected[(Process, () => Unit)] = {
+
+    regenerateManifestJson(ea.manifestJson) match {
+      case Left(err) => return Left(err)
+      case _ => Unit
+    }
+
+    Template.verifyTemplateMinVersion(new File("template.json")).right.flatMap 
{
+      _ =>
+        withRegisteredManifest(ea) { em =>
+            RunWorkflow.runWorkflow(wa, sa, em, pioHome, verbose)
+          }
+    }
+  }
+
+  /** Deploying an engine.
+    *  The function starts a new process to be run concerrently.
+    *
+    * @param ea An instance of [[EngineArgs]]
+    * @param engineInstanceId An instance of [[engineInstanceId]]
+    * @param serverArgs An instance of [[ServerArgs]]
+    * @param sparkArgs An instance of [[SparkArgs]]
+    * @param pioHome [[String]] with a path to PIO installation
+    * @param verbose A [[Boolean]]
+    * @return An instance of [[Expected]] contaning either [[Left]]
+    *         with an error message or [[Right]] with a handle to process
+    *         of a running angine  and a function () => Unit,
+    *         that must be called when the process is complete
+    */
+  def deploy(
+    ea: EngineArgs,
+    engineInstanceId: Option[String],
+    serverArgs: ServerArgs,
+    sparkArgs: SparkArgs,
+    pioHome: String,
+    verbose: Boolean = false): Expected[(Process, () => Unit)] = {
+
+    val verifyResult = Template.verifyTemplateMinVersion(new 
File("template.json"))
+    if (verifyResult.isLeft) {
+      return Left(verifyResult.left.get)
+    }
+    withRegisteredManifest(ea) { em =>
+      val variantJson = parse(Source.fromFile(serverArgs.variantJson).mkString)
+      val variantId = variantJson \ "id" match {
+        case JString(s) => s
+        case _ =>
+          return logAndFail("Unable to read engine variant ID from " +
+            s"${serverArgs.variantJson.getCanonicalPath}. Aborting.")
+      }
+      val engineInstances = storage.Storage.getMetaDataEngineInstances
+      val engineInstance = engineInstanceId map { eid =>
+        engineInstances.get(eid)
+      } getOrElse {
+        engineInstances.getLatestCompleted(em.id, em.version, variantId)
+      }
+      engineInstance map { r =>
+        RunServer.runServer(r.id, serverArgs, sparkArgs, em, pioHome, verbose)
+      } getOrElse {
+        engineInstanceId map { eid =>
+          logAndFail(s"Invalid engine instance ID ${eid}. Aborting.")
+        } getOrElse {
+          logAndFail(s"No valid engine instance found for engine ${em.id} " +
+            s"${em.version}.\nTry running 'train' before 'deploy'. Aborting.")
+        }
+      }
+    }
+  }
+
+  def undeploy(da: DeployArgs): MaybeError = {
+
+    val serverUrl = s"http://${da.ip}:${da.port}";
+    info(
+      s"Undeploying any existing engine instance at ${serverUrl}")
+    try {
+      val code = Http(s"${serverUrl}/stop").asString.code
+      code match {
+        case 200 => Success
+        case 404 =>
+          logAndFail(s"Another process is using ${serverUrl}. Unable to 
undeploy.")
+        case _ =>
+          logAndFail(s"Another process is using ${serverUrl}, or an existing " 
+
+            s"engine server is not responding properly (HTTP ${code}). " +
+            "Unable to undeploy.")
+      }
+    } catch {
+      case e: java.net.ConnectException =>
+        logAndFail(s"Nothing at ${serverUrl}")
+      case _: Throwable =>
+        logAndFail("Another process might be occupying " +
+          s"${da.ip}:${da.port}. Unable to undeploy.")
+    }
+  }
+
+  /** Running a driver on spark.
+    *  The function starts a process and returns immediately
+    *
+    * @param mainClass A [[String]] with the class containing a main 
functionto run
+    * @param driverArguments Arguments to be passed to the main function
+    * @param manifestJson An instance of [[File]] for running a single 
training.
+    * @param buildArgs An instance of [[BuildArgs]]
+    * @param sparkArgs an instance of [[SparkArgs]]
+    * @param pioHome [[String]] with a path to PIO installation
+    * @param verbose A [[Boolean]]
+    * @return An instance of [[Expected]] contaning either [[Left]]
+    *         with an error message or [[Right]] with a handle to a process
+    *         of a running driver
+    */
+  def run(
+    mainClass: String,
+    driverArguments: Seq[String],
+    manifestJson: File,
+    buildArgs: BuildArgs,
+    sparkArgs: SparkArgs,
+    pioHome: String,
+    verbose: Boolean): Expected[Process] = {
+
+    generateManifestJson(manifestJson) match {
+      case Left(err) => return Left(err)
+      case _ => Unit
+    }
+
+    compile(buildArgs, pioHome, verbose)
+
+    val extraFiles = WorkflowUtils.thirdPartyConfFiles
+
+    val jarFiles = jarFilesForScala
+    jarFiles foreach { f => info(s"Found JAR: ${f.getName}") }
+    val allJarFiles = jarFiles.map(_.getCanonicalPath)
+    val cmd = s"${getSparkHome(sparkArgs.sparkHome)}/bin/spark-submit --jars " 
+
+      s"${allJarFiles.mkString(",")} " +
+      (if (extraFiles.size > 0) {
+        s"--files ${extraFiles.mkString(",")} "
+      } else {
+        ""
+      }) +
+      "--class " +
+      s"${mainClass} ${sparkArgs.sparkPassThrough.mkString(" ")} " +
+      coreAssembly(pioHome) + " " +
+      driverArguments.mkString(" ")
+    info(s"Submission command: ${cmd}")
+    Right(Process(
+      cmd,
+      None,
+      "SPARK_YARN_USER_ENV" -> sys.env.filter(kv => kv._1.startsWith("PIO_")).
+        map(kv => s"${kv._1}=${kv._2}").mkString(",")).run())
+  }
+
+  def unregister(jsonManifest: File): MaybeError = {
+    RegisterEngine.unregisterEngine(jsonManifest)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/commands/Export.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/predictionio/tools/commands/Export.scala 
b/tools/src/main/scala/org/apache/predictionio/tools/commands/Export.scala
new file mode 100644
index 0000000..ed6b487
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/Export.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.tools.commands
+
+import org.apache.predictionio.tools.Runner
+import org.apache.predictionio.tools.SparkArgs
+import org.apache.predictionio.tools.ReturnTypes._
+
+import scala.sys.process._
+
+case class ExportArgs(
+  appId: Int = 0,
+  channel: Option[String] = None,
+  outputPath: String = "",
+  format: String = "json")
+
+object Export {
+  def eventsToFile(
+    ea: ExportArgs,
+    sa: SparkArgs,
+    pioHome: String): Expected[(Process, () => Unit)] = {
+
+    val channelArg = ea.channel
+      .map(ch => Seq("--channel", ch)).getOrElse(Nil)
+    Runner.runOnSpark(
+      "org.apache.predictionio.tools.export.EventsToFile",
+      Seq(
+        "--appid",
+        ea.appId.toString,
+        "--output",
+        ea.outputPath,
+        "--format",
+        ea.format) ++ channelArg,
+      sa,
+      Nil,
+      pioHome)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/commands/Import.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/predictionio/tools/commands/Import.scala 
b/tools/src/main/scala/org/apache/predictionio/tools/commands/Import.scala
new file mode 100644
index 0000000..9fac559
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/Import.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.tools.commands
+
+import org.apache.predictionio.tools.Runner
+import org.apache.predictionio.tools.SparkArgs
+import org.apache.predictionio.tools.ReturnTypes._
+
+import scala.sys.process._
+
+case class ImportArgs(
+  appId: Int = 0,
+  channel: Option[String] = None,
+  inputPath: String = "")
+
+object Import {
+  def fileToEvents(
+    ia: ImportArgs,
+    sa: SparkArgs,
+    pioHome: String): Expected[(Process, () => Unit)] = {
+
+    val channelArg = ia.channel
+      .map(ch => Seq("--channel", ch)).getOrElse(Nil)
+    Runner.runOnSpark(
+      "org.apache.predictionio.tools.imprt.FileToEvents",
+      Seq(
+        "--appid",
+        ia.appId.toString,
+        "--input",
+        ia.inputPath) ++ channelArg,
+      sa,
+      Nil,
+      pioHome)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala 
b/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala
new file mode 100644
index 0000000..10aca41
--- /dev/null
+++ 
b/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools.commands
+
+import org.apache.predictionio.core.BuildInfo
+import org.apache.predictionio.data.storage
+import org.apache.predictionio.data.api.EventServer
+import org.apache.predictionio.data.api.EventServerConfig
+import org.apache.predictionio.tools.EventServerArgs
+import org.apache.predictionio.tools.EitherLogging
+import org.apache.predictionio.tools.Common
+import org.apache.predictionio.tools.ReturnTypes._
+import org.apache.predictionio.tools.dashboard.Dashboard
+import org.apache.predictionio.tools.dashboard.DashboardConfig
+import org.apache.predictionio.tools.admin.AdminServer
+import org.apache.predictionio.tools.admin.AdminServerConfig
+
+import akka.actor.ActorSystem
+import java.io.File
+import scala.io.Source
+import semverfi._
+
+case class DashboardArgs(
+  ip: String = "127.0.0.1",
+  port: Int = 9000)
+
+case class AdminServerArgs(
+  ip: String = "127.0.0.1",
+  port: Int = 7071)
+
+case class PioStatus(
+  version: String = "",
+  pioHome: String = "",
+  sparkHome: String = "",
+  sparkVersion: String = "",
+  sparkMinVersion: String = "",
+  warnings: Seq[String] = Seq())
+
+object Management extends EitherLogging {
+
+  def version(): String = BuildInfo.version
+
+  /** Starts a dashboard server and returns immediately
+    *
+    * @param da An instance of [[DashboardArgs]]
+    * @return An instance of [[ActorSystem]] in which the server is being 
executed
+    */
+  def dashboard(da: DashboardArgs): ActorSystem = {
+    info(s"Creating dashboard at ${da.ip}:${da.port}")
+    Dashboard.createDashboard(DashboardConfig(
+      ip = da.ip,
+      port = da.port))
+  }
+
+  /** Starts an eventserver server and returns immediately
+    *
+    * @param ea An instance of [[EventServerArgs]]
+    * @return An instance of [[ActorSystem]] in which the server is being 
executed
+    */
+  def eventserver(ea: EventServerArgs): ActorSystem = {
+    info(s"Creating Event Server at ${ea.ip}:${ea.port}")
+    EventServer.createEventServer(EventServerConfig(
+      ip = ea.ip,
+      port = ea.port,
+      stats = ea.stats))
+  }
+
+  /** Starts an adminserver server and returns immediately
+    *
+    * @param aa An instance of [[AdminServerArgs]]
+    * @return An instance of [[ActorSystem]] in which the server is being 
executed
+    */
+  def adminserver(aa: AdminServerArgs): ActorSystem = {
+    info(s"Creating Admin Server at ${aa.ip}:${aa.port}")
+    AdminServer.createAdminServer(AdminServerConfig(
+      ip = aa.ip,
+      port = aa.port
+    ))
+  }
+
+  private def stripMarginAndNewlines(string: String): String =
+    string.stripMargin.replaceAll("\n", " ")
+
+  def status(pioHome: Option[String], sparkHome: Option[String]): 
Expected[PioStatus] = {
+    var pioStatus = PioStatus()
+    info("Inspecting PredictionIO...")
+    pioHome map { pioHome =>
+      info(s"PredictionIO ${BuildInfo.version} is installed at $pioHome")
+      pioStatus = pioStatus.copy(version = version(), pioHome = pioHome)
+    } getOrElse {
+      return logAndFail("Unable to locate PredictionIO installation. 
Aborting.")
+    }
+    info("Inspecting Apache Spark...")
+    val sparkHomePath = Common.getSparkHome(sparkHome)
+    if (new File(s"$sparkHomePath/bin/spark-submit").exists) {
+      info(s"Apache Spark is installed at $sparkHome")
+      val sparkMinVersion = "1.3.0"
+      pioStatus = pioStatus.copy(
+        sparkHome = sparkHomePath,
+        sparkMinVersion = sparkMinVersion)
+      val sparkReleaseFile = new File(s"$sparkHomePath/RELEASE")
+      if (sparkReleaseFile.exists) {
+        val sparkReleaseStrings =
+          Source.fromFile(sparkReleaseFile).mkString.split(' ')
+        if (sparkReleaseStrings.length < 2) {
+          val warning = (stripMarginAndNewlines(
+            s"""|Apache Spark version information cannot be found (RELEASE file
+                |is empty). This is a known issue for certain vendors (e.g.
+                |Cloudera). Please make sure you are using a version of at 
least
+                |$sparkMinVersion."""))
+          warn(warning)
+          pioStatus = pioStatus.copy(warnings = pioStatus.warnings :+ warning)
+        } else {
+          val sparkReleaseVersion = sparkReleaseStrings(1)
+          val parsedMinVersion = Version.apply(sparkMinVersion)
+          val parsedCurrentVersion = Version.apply(sparkReleaseVersion)
+          if (parsedCurrentVersion >= parsedMinVersion) {
+            info(stripMarginAndNewlines(
+              s"""|Apache Spark $sparkReleaseVersion detected (meets minimum
+                  |requirement of $sparkMinVersion)"""))
+            pioStatus = pioStatus.copy(sparkVersion = sparkReleaseVersion)
+          } else {
+            return logAndFail(stripMarginAndNewlines(
+              s"""|Apache Spark $sparkReleaseVersion detected (does not meet
+                  |minimum requirement. Aborting."""))
+          }
+        }
+      } else {
+        val warning = (stripMarginAndNewlines(
+          s"""|Apache Spark version information cannot be found. If you are
+              |using a developmental tree, please make sure you are using a
+              |version of at least $sparkMinVersion."""))
+        warn(warning)
+        pioStatus = pioStatus.copy(warnings = pioStatus.warnings :+ warning)
+      }
+    } else {
+      return logAndFail("Unable to locate a proper Apache Spark installation. 
Aborting.")
+    }
+    info("Inspecting storage backend connections...")
+    try {
+      storage.Storage.verifyAllDataObjects()
+    } catch {
+      case e: Throwable =>
+        val errStr = s"""Unable to connect to all storage backends 
successfully.
+            |The following shows the error message from the storage backend.
+            |${e.getMessage} (${e.getClass.getName})", e)
+            |Dumping configuration of initialized storage backend sources.
+            |"Please make sure they are correct.
+            |"""
+        val sources = storage.Storage.config.get("sources") map { src =>
+          src map { case (s, p) =>
+            s"Source Name: $s; Type: ${p.getOrElse("type", "(error)")}; " +
+              s"Configuration: ${p.getOrElse("config", "(error)")}"
+          } mkString("\n")
+        } getOrElse {
+          "No properly configured storage backend sources."
+        }
+        return logAndFail(errStr + sources)
+    }
+    info("Your system is all ready to go.")
+    Right(pioStatus)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/commands/Template.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/predictionio/tools/commands/Template.scala 
b/tools/src/main/scala/org/apache/predictionio/tools/commands/Template.scala
new file mode 100644
index 0000000..1476598
--- /dev/null
+++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/Template.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.tools.commands
+
+import java.io.File
+
+import scala.io.Source
+
+import grizzled.slf4j.Logging
+import org.apache.predictionio.core.BuildInfo
+import org.apache.predictionio.tools.EitherLogging
+import org.apache.predictionio.tools.ReturnTypes._
+import org.json4s._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.{write, read}
+import semverfi._
+
+case class TemplateMetaData(
+  pioVersionMin: Option[String] = None)
+
+object Template extends EitherLogging {
+
+  def templateMetaData(templateJson: File): TemplateMetaData = {
+    if (!templateJson.exists) {
+      warn(s"$templateJson does not exist. Template metadata will not be 
available. " +
+        "(This is safe to ignore if you are not working on a template.)")
+      TemplateMetaData()
+    } else {
+      val jsonString = 
Source.fromFile(templateJson)(scala.io.Codec.ISO8859).mkString
+      val json = try {
+        parse(jsonString)
+      } catch {
+        case e: org.json4s.ParserUtil.ParseException =>
+          warn(s"$templateJson cannot be parsed. Template metadata will not be 
available.")
+          return TemplateMetaData()
+      }
+      val pioVersionMin = json \ "pio" \ "version" \ "min"
+      pioVersionMin match {
+        case JString(s) => TemplateMetaData(pioVersionMin = Some(s))
+        case _ => TemplateMetaData()
+      }
+    }
+  }
+
+  def verifyTemplateMinVersion(templateJsonFile: File): MaybeError = {
+    val metadata = templateMetaData(templateJsonFile)
+
+    for (pvm <- metadata.pioVersionMin) {
+      if (Version(BuildInfo.version) < Version(pvm)) {
+        return logAndFail(s"This engine template requires at least 
PredictionIO $pvm. " +
+          s"The template may not work with PredictionIO ${BuildInfo.version}.")
+      }
+    }
+    Success
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/console/AccessKey.scala
----------------------------------------------------------------------
diff --git 
a/tools/src/main/scala/org/apache/predictionio/tools/console/AccessKey.scala 
b/tools/src/main/scala/org/apache/predictionio/tools/console/AccessKey.scala
deleted file mode 100644
index fcb9608..0000000
--- a/tools/src/main/scala/org/apache/predictionio/tools/console/AccessKey.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.tools.console
-
-import org.apache.predictionio.data.storage
-
-import grizzled.slf4j.Logging
-
-case class AccessKeyArgs(
-  accessKey: String = "",
-  events: Seq[String] = Seq())
-
-object AccessKey extends Logging {
-  def create(ca: ConsoleArgs): Int = {
-    val apps = storage.Storage.getMetaDataApps
-    apps.getByName(ca.app.name) map { app =>
-      val accessKeys = storage.Storage.getMetaDataAccessKeys
-      val accessKey = accessKeys.insert(storage.AccessKey(
-        key = ca.accessKey.accessKey,
-        appid = app.id,
-        events = ca.accessKey.events))
-      accessKey map { k =>
-        info(s"Created new access key: ${k}")
-        0
-      } getOrElse {
-        error(s"Unable to create new access key.")
-        1
-      }
-    } getOrElse {
-      error(s"App ${ca.app.name} does not exist. Aborting.")
-      1
-    }
-  }
-
-  def list(ca: ConsoleArgs): Int = {
-    val keys =
-      if (ca.app.name == "") {
-        storage.Storage.getMetaDataAccessKeys.getAll
-      } else {
-        val apps = storage.Storage.getMetaDataApps
-        apps.getByName(ca.app.name) map { app =>
-          storage.Storage.getMetaDataAccessKeys.getByAppid(app.id)
-        } getOrElse {
-          error(s"App ${ca.app.name} does not exist. Aborting.")
-          return 1
-        }
-      }
-    val title = "Access Key(s)"
-    info(f"$title%64s | App ID | Allowed Event(s)")
-    keys.sortBy(k => k.appid) foreach { k =>
-      val events =
-        if (k.events.size > 0) k.events.sorted.mkString(",") else "(all)"
-      info(f"${k.key}%64s | ${k.appid}%6d | $events%s")
-    }
-    info(s"Finished listing ${keys.size} access key(s).")
-    0
-  }
-
-  def delete(ca: ConsoleArgs): Int = {
-    try {
-      storage.Storage.getMetaDataAccessKeys.delete(ca.accessKey.accessKey)
-      info(s"Deleted access key ${ca.accessKey.accessKey}.")
-      0
-    } catch {
-      case e: Exception =>
-        error(s"Error deleting access key ${ca.accessKey.accessKey}.", e)
-        1
-    }
-  }
-}

Reply via email to