[PIO-28] Console refactor Extract logic from functions handling console commands in tools package
Closes #283 Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/e4a3c0c9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/e4a3c0c9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/e4a3c0c9 Branch: refs/heads/develop Commit: e4a3c0c9fc1251d7355d921acb66168226446b3f Parents: f7c30d3 Author: Marcin ZiemiÅski <[email protected]> Authored: Tue Nov 1 10:54:07 2016 -0700 Committer: Donald Szeto <[email protected]> Committed: Tue Nov 1 10:54:07 2016 -0700 ---------------------------------------------------------------------- .../predictionio/data/api/EventServer.scala | 5 +- .../org/apache/predictionio/tools/Common.scala | 114 +++ .../predictionio/tools/RegisterEngine.scala | 18 +- .../apache/predictionio/tools/RunServer.scala | 176 ++--- .../apache/predictionio/tools/RunWorkflow.scala | 195 +---- .../org/apache/predictionio/tools/Runner.scala | 66 +- .../predictionio/tools/admin/AdminAPI.scala | 5 +- .../predictionio/tools/commands/AccessKey.scala | 70 ++ .../predictionio/tools/commands/App.scala | 365 +++++++++ .../predictionio/tools/commands/Engine.scala | 424 +++++++++++ .../predictionio/tools/commands/Export.scala | 54 ++ .../predictionio/tools/commands/Import.scala | 51 ++ .../tools/commands/Management.scala | 178 +++++ .../predictionio/tools/commands/Template.scala | 71 ++ .../predictionio/tools/console/AccessKey.scala | 86 --- .../apache/predictionio/tools/console/App.scala | 540 -------------- .../predictionio/tools/console/Console.scala | 734 +++---------------- .../predictionio/tools/console/Export.scala | 45 -- .../predictionio/tools/console/Import.scala | 42 -- .../apache/predictionio/tools/console/Pio.scala | 352 +++++++++ .../predictionio/tools/console/Template.scala | 432 ----------- .../tools/dashboard/Dashboard.scala | 9 +- .../tools/console/template.scala.txt | 24 +- .../tools/console/upgrade.scala.txt | 14 +- 24 files changed, 1932 insertions(+), 2138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala index a0ba40f..648316e 100644 --- a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala +++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala @@ -611,7 +611,7 @@ case class EventServerConfig( stats: Boolean = false) object EventServer { - def createEventServer(config: EventServerConfig): Unit = { + def createEventServer(config: EventServerConfig): ActorSystem = { implicit val system = ActorSystem("EventServerSystem") val eventClient = Storage.getLEvents() @@ -630,7 +630,7 @@ object EventServer { if (config.stats) system.actorOf(Props[StatsActor], "StatsActor") system.actorOf(Props[PluginsActor], "PluginsActor") serverActor ! StartServer(config.ip, config.port) - system.awaitTermination() + system } } @@ -639,5 +639,6 @@ object Run { EventServer.createEventServer(EventServerConfig( ip = "0.0.0.0", port = 7070)) + .awaitTermination } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/Common.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/Common.scala b/tools/src/main/scala/org/apache/predictionio/tools/Common.scala new file mode 100644 index 0000000..c379138 --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/Common.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools + +import org.apache.predictionio.core.BuildInfo +import org.apache.predictionio.tools.ReturnTypes._ + +import grizzled.slf4j.Logging +import java.io.File + + +object ReturnTypes { + sealed case class Ok() + + type MaybeError = Either[String, Ok] + type Expected[T] = Either[String, T] + + val Success: MaybeError = Right(Ok()) +} + +trait EitherLogging extends Logging { + import ReturnTypes._ + + protected def logAndFail[T](msg: => String): Expected[T] = { + error(msg) + Left(msg) + } + + protected def logOnFail[T](msg: => String, t: => Throwable): Expected[T] = { + error(msg, t) + Left(msg) + } + + protected def logAndReturn[T](value: T, msg: => Any): Expected[T] = { + info(msg) + Right(value) + } + + protected def logAndSucceed(msg: => Any): MaybeError = { + info(msg) + Success + } +} + +object Common extends EitherLogging { + + def getSparkHome(sparkHome: Option[String]): String = { + sparkHome getOrElse { + sys.env.getOrElse("SPARK_HOME", ".") + } + } + + def versionNoPatch(fullVersion: String): String = { + val v = """^(\d+\.\d+)""".r + val versionNoPatch = for { + v(np) <- v findFirstIn fullVersion + } yield np + versionNoPatch.getOrElse(fullVersion) + } + + def jarFilesForScala: Array[File] = { + def recursiveListFiles(f: File): Array[File] = { + Option(f.listFiles) map { these => + these ++ these.filter(_.isDirectory).flatMap(recursiveListFiles) + } getOrElse Array[File]() + } + def jarFilesForScalaFilter(jars: Array[File]): Array[File] = + jars.filterNot { f => + f.getName.toLowerCase.endsWith("-javadoc.jar") || + f.getName.toLowerCase.endsWith("-sources.jar") + } + def jarFilesAt(path: File): Array[File] = recursiveListFiles(path) filter { + _.getName.toLowerCase.endsWith(".jar") + } + val libFiles = jarFilesForScalaFilter(jarFilesAt(new File("lib"))) + val scalaVersionNoPatch = Common.versionNoPatch(BuildInfo.scalaVersion) + val targetFiles = jarFilesForScalaFilter(jarFilesAt(new File("target" + + File.separator + s"scala-${scalaVersionNoPatch}"))) + // Use libFiles is target is empty. + if (targetFiles.size > 0) targetFiles else libFiles + } + + def coreAssembly(pioHome: String): Expected[File] = { + val core = s"pio-assembly-${BuildInfo.version}.jar" + val coreDir = + if (new File(pioHome + File.separator + "RELEASE").exists) { + new File(pioHome + File.separator + "lib") + } else { + new File(pioHome + File.separator + "assembly") + } + val coreFile = new File(coreDir, core) + if (coreFile.exists) { + Right(coreFile) + } else { + logAndFail(s"PredictionIO Core Assembly (${coreFile.getCanonicalPath}) does " + + "not exist. Aborting.") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala b/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala index 189e5a8..334981c 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala @@ -24,6 +24,7 @@ import grizzled.slf4j.Logging import org.apache.predictionio.data.storage.EngineManifest import org.apache.predictionio.data.storage.EngineManifestSerializer import org.apache.predictionio.data.storage.Storage +import org.apache.predictionio.tools.ReturnTypes._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path @@ -32,35 +33,34 @@ import org.json4s.native.Serialization.read import scala.io.Source -object RegisterEngine extends Logging { +object RegisterEngine extends EitherLogging { val engineManifests = Storage.getMetaDataEngineManifests implicit val formats = DefaultFormats + new EngineManifestSerializer def registerEngine( jsonManifest: File, engineFiles: Seq[File], - copyLocal: Boolean = false): Unit = { + copyLocal: Boolean = false): MaybeError = { val jsonString = try { Source.fromFile(jsonManifest).mkString } catch { case e: java.io.FileNotFoundException => - error(s"Engine manifest file not found: ${e.getMessage}. Aborting.") - sys.exit(1) + return logAndFail(s"Engine manifest file not found: ${e.getMessage}. Aborting.") } val engineManifest = read[EngineManifest](jsonString) info(s"Registering engine ${engineManifest.id} ${engineManifest.version}") engineManifests.update( engineManifest.copy(files = engineFiles.map(_.toURI.toString)), true) + Success } - def unregisterEngine(jsonManifest: File): Unit = { + def unregisterEngine(jsonManifest: File): MaybeError = { val jsonString = try { Source.fromFile(jsonManifest).mkString } catch { case e: java.io.FileNotFoundException => - error(s"Engine manifest file not found: ${e.getMessage}. Aborting.") - sys.exit(1) + return logAndFail(s"Engine manifest file not found: ${e.getMessage}. Aborting.") } val fileEngineManifest = read[EngineManifest](jsonString) val engineManifest = engineManifests.get( @@ -78,9 +78,9 @@ object RegisterEngine extends Logging { } engineManifests.delete(em.id, em.version) - info(s"Unregistered engine ${em.id} ${em.version}") + logAndSucceed(s"Unregistered engine ${em.id} ${em.version}") } getOrElse { - error(s"${fileEngineManifest.id} ${fileEngineManifest.version} is not " + + logAndFail(s"${fileEngineManifest.id} ${fileEngineManifest.version} is not " + "registered.") } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala b/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala index d091654..1431432 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala @@ -23,159 +23,75 @@ import java.net.URI import grizzled.slf4j.Logging import org.apache.predictionio.data.storage.EngineManifest +import org.apache.predictionio.tools.ReturnTypes._ import org.apache.predictionio.tools.console.ConsoleArgs import org.apache.predictionio.workflow.WorkflowUtils +import org.apache.predictionio.workflow.JsonExtractorOption +import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption import scala.sys.process._ -object RunServer extends Logging { - def runServer( - ca: ConsoleArgs, - core: File, - em: EngineManifest, - engineInstanceId: String): Int = { - val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_")).map(kv => - s"${kv._1}=${kv._2}" - ).mkString(",") - - val sparkHome = ca.common.sparkHome.getOrElse( - sys.env.getOrElse("SPARK_HOME", ".")) - - val extraFiles = WorkflowUtils.thirdPartyConfFiles - - val driverClassPathIndex = - ca.common.sparkPassThrough.indexOf("--driver-class-path") - val driverClassPathPrefix = - if (driverClassPathIndex != -1) { - Seq(ca.common.sparkPassThrough(driverClassPathIndex + 1)) - } else { - Seq() - } - val extraClasspaths = - driverClassPathPrefix ++ WorkflowUtils.thirdPartyClasspaths +case class DeployArgs( + ip: String = "0.0.0.0", + port: Int = 8000, + logUrl: Option[String] = None, + logPrefix: Option[String] = None) - val deployModeIndex = - ca.common.sparkPassThrough.indexOf("--deploy-mode") - val deployMode = if (deployModeIndex != -1) { - ca.common.sparkPassThrough(deployModeIndex + 1) - } else { - "client" - } +case class EventServerArgs( + enabled: Boolean = false, + ip: String = "0.0.0.0", + port: Int = 7070, + stats: Boolean = false) - val mainJar = - if (ca.build.uberJar) { - if (deployMode == "cluster") { - em.files.filter(_.startsWith("hdfs")).head - } else { - em.files.filterNot(_.startsWith("hdfs")).head - } - } else { - if (deployMode == "cluster") { - em.files.filter(_.contains("pio-assembly")).head - } else { - core.getCanonicalPath - } - } - - val jarFiles = (em.files ++ Option(new File(ca.common.pioHome.get, "plugins") - .listFiles()).getOrElse(Array.empty[File]).map(_.getAbsolutePath)).mkString(",") - - val sparkSubmit = - Seq(Seq(sparkHome, "bin", "spark-submit").mkString(File.separator)) ++ - ca.common.sparkPassThrough ++ - Seq( - "--class", - "org.apache.predictionio.workflow.CreateServer", - "--name", - s"PredictionIO Engine Instance: ${engineInstanceId}") ++ - (if (!ca.build.uberJar) { - Seq("--jars", jarFiles) - } else Seq()) ++ - (if (extraFiles.size > 0) { - Seq("--files", extraFiles.mkString(",")) - } else { - Seq() - }) ++ - (if (extraClasspaths.size > 0) { - Seq("--driver-class-path", extraClasspaths.mkString(":")) - } else { - Seq() - }) ++ - (if (ca.common.sparkKryo) { - Seq( - "--conf", - "spark.serializer=org.apache.spark.serializer.KryoSerializer") - } else { - Seq() - }) ++ - Seq( - mainJar, - "--engineInstanceId", - engineInstanceId, - "--ip", - ca.deploy.ip, - "--port", - ca.deploy.port.toString, - "--event-server-ip", - ca.eventServer.ip, - "--event-server-port", - ca.eventServer.port.toString) ++ - (if (ca.accessKey.accessKey != "") { - Seq("--accesskey", ca.accessKey.accessKey) - } else { - Seq() - }) ++ - (if (ca.eventServer.enabled) Seq("--feedback") else Seq()) ++ - (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) ++ - (if (ca.common.verbose) Seq("--verbose") else Seq()) ++ - ca.deploy.logUrl.map(x => Seq("--log-url", x)).getOrElse(Seq()) ++ - ca.deploy.logPrefix.map(x => Seq("--log-prefix", x)).getOrElse(Seq()) ++ - Seq("--json-extractor", ca.common.jsonExtractor.toString) +case class ServerArgs( + deploy: DeployArgs = DeployArgs(), + eventServer: EventServerArgs = EventServerArgs(), + batch: String = "", + accessKey: String = "", + variantJson: File = new File("engine.json"), + jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both) - info(s"Submission command: ${sparkSubmit.mkString(" ")}") - val proc = - Process(sparkSubmit, None, "CLASSPATH" -> "", "SPARK_YARN_USER_ENV" -> pioEnvVars).run() - Runtime.getRuntime.addShutdownHook(new Thread(new Runnable { - def run(): Unit = { - proc.destroy() - } - })) - proc.exitValue() - } +object RunServer extends Logging { - def newRunServer( - ca: ConsoleArgs, + def runServer( + engineInstanceId: String, + serverArgs: ServerArgs, + sparkArgs: SparkArgs, em: EngineManifest, - engineInstanceId: String): Int = { + pioHome: String, + verbose: Boolean = false): Expected[(Process, () => Unit)] = { + val jarFiles = em.files.map(new URI(_)) ++ - Option(new File(ca.common.pioHome.get, "plugins").listFiles()) + Option(new File(pioHome, "plugins").listFiles()) .getOrElse(Array.empty[File]).map(_.toURI) val args = Seq( "--engineInstanceId", engineInstanceId, "--engine-variant", - ca.common.variantJson.toURI.toString, + serverArgs.variantJson.toURI.toString, "--ip", - ca.deploy.ip, + serverArgs.deploy.ip, "--port", - ca.deploy.port.toString, + serverArgs.deploy.port.toString, "--event-server-ip", - ca.eventServer.ip, + serverArgs.eventServer.ip, "--event-server-port", - ca.eventServer.port.toString) ++ - (if (ca.accessKey.accessKey != "") { - Seq("--accesskey", ca.accessKey.accessKey) + serverArgs.eventServer.port.toString) ++ + (if (serverArgs.accessKey != "") { + Seq("--accesskey", serverArgs.accessKey) } else { Nil }) ++ - (if (ca.eventServer.enabled) Seq("--feedback") else Nil) ++ - (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Nil) ++ - (if (ca.common.verbose) Seq("--verbose") else Nil) ++ - ca.deploy.logUrl.map(x => Seq("--log-url", x)).getOrElse(Nil) ++ - ca.deploy.logPrefix.map(x => Seq("--log-prefix", x)).getOrElse(Nil) ++ - Seq("--json-extractor", ca.common.jsonExtractor.toString) - - Runner.runOnSpark("org.apache.predictionio.workflow.CreateServer", args, ca, jarFiles) + (if (serverArgs.eventServer.enabled) Seq("--feedback") else Nil) ++ + (if (serverArgs.batch != "") Seq("--batch", serverArgs.batch) else Nil) ++ + (if (verbose) Seq("--verbose") else Nil) ++ + serverArgs.deploy.logUrl.map(x => Seq("--log-url", x)).getOrElse(Nil) ++ + serverArgs.deploy.logPrefix.map(x => Seq("--log-prefix", x)).getOrElse(Nil) ++ + Seq("--json-extractor", serverArgs.jsonExtractor.toString) + + Runner.runOnSpark( + "org.apache.predictionio.workflow.CreateServer", + args, sparkArgs, jarFiles, pioHome, verbose) } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala index 097eb83..8b8d769 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala @@ -24,156 +24,39 @@ import java.net.URI import grizzled.slf4j.Logging import org.apache.predictionio.data.storage.EngineManifest import org.apache.predictionio.tools.console.ConsoleArgs +import org.apache.predictionio.tools.ReturnTypes._ import org.apache.predictionio.workflow.WorkflowUtils +import org.apache.predictionio.workflow.JsonExtractorOption +import org.apache.predictionio.workflow.JsonExtractorOption.JsonExtractorOption + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import scala.sys.process._ -object RunWorkflow extends Logging { - def runWorkflow( - ca: ConsoleArgs, - core: File, - em: EngineManifest, - variantJson: File): Int = { - // Collect and serialize PIO_* environmental variables - val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_")).map(kv => - s"${kv._1}=${kv._2}" - ).mkString(",") - - val sparkHome = ca.common.sparkHome.getOrElse( - sys.env.getOrElse("SPARK_HOME", ".")) - - val hadoopConf = new Configuration - val hdfs = FileSystem.get(hadoopConf) - - val driverClassPathIndex = - ca.common.sparkPassThrough.indexOf("--driver-class-path") - val driverClassPathPrefix = - if (driverClassPathIndex != -1) { - Seq(ca.common.sparkPassThrough(driverClassPathIndex + 1)) - } else { - Seq() - } - val extraClasspaths = - driverClassPathPrefix ++ WorkflowUtils.thirdPartyClasspaths - - val deployModeIndex = - ca.common.sparkPassThrough.indexOf("--deploy-mode") - val deployMode = if (deployModeIndex != -1) { - ca.common.sparkPassThrough(deployModeIndex + 1) - } else { - "client" - } - - val extraFiles = WorkflowUtils.thirdPartyConfFiles +case class WorkflowArgs( + batch: String = "", + variantJson: File = new File("engine.json"), + verbosity: Int = 0, + engineParamsKey: Option[String] = None, + engineFactory: Option[String] = None, + evaluation: Option[String] = None, + engineParamsGenerator: Option[String] = None, + stopAfterRead: Boolean = false, + stopAfterPrepare: Boolean = false, + skipSanityCheck: Boolean = false, + jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both) - val mainJar = - if (ca.build.uberJar) { - if (deployMode == "cluster") { - em.files.filter(_.startsWith("hdfs")).head - } else { - em.files.filterNot(_.startsWith("hdfs")).head - } - } else { - if (deployMode == "cluster") { - em.files.filter(_.contains("pio-assembly")).head - } else { - core.getCanonicalPath - } - } - - val workMode = - ca.common.evaluation.map(_ => "Evaluation").getOrElse("Training") - - val engineLocation = Seq( - sys.env("PIO_FS_ENGINESDIR"), - em.id, - em.version) - - if (deployMode == "cluster") { - val dstPath = new Path(engineLocation.mkString(Path.SEPARATOR)) - info("Cluster deploy mode detected. Trying to copy " + - s"${variantJson.getCanonicalPath} to " + - s"${hdfs.makeQualified(dstPath).toString}.") - hdfs.copyFromLocalFile(new Path(variantJson.toURI), dstPath) - } - - val sparkSubmit = - Seq(Seq(sparkHome, "bin", "spark-submit").mkString(File.separator)) ++ - ca.common.sparkPassThrough ++ - Seq( - "--class", - "org.apache.predictionio.workflow.CreateWorkflow", - "--name", - s"PredictionIO $workMode: ${em.id} ${em.version} (${ca.common.batch})") ++ - (if (!ca.build.uberJar) { - Seq("--jars", em.files.mkString(",")) - } else Seq()) ++ - (if (extraFiles.size > 0) { - Seq("--files", extraFiles.mkString(",")) - } else { - Seq() - }) ++ - (if (extraClasspaths.size > 0) { - Seq("--driver-class-path", extraClasspaths.mkString(":")) - } else { - Seq() - }) ++ - (if (ca.common.sparkKryo) { - Seq( - "--conf", - "spark.serializer=org.apache.spark.serializer.KryoSerializer") - } else { - Seq() - }) ++ - Seq( - mainJar, - "--env", - pioEnvVars, - "--engine-id", - em.id, - "--engine-version", - em.version, - "--engine-variant", - if (deployMode == "cluster") { - hdfs.makeQualified(new Path( - (engineLocation :+ variantJson.getName).mkString(Path.SEPARATOR))). - toString - } else { - variantJson.getCanonicalPath - }, - "--verbosity", - ca.common.verbosity.toString) ++ - ca.common.engineFactory.map( - x => Seq("--engine-factory", x)).getOrElse(Seq()) ++ - ca.common.engineParamsKey.map( - x => Seq("--engine-params-key", x)).getOrElse(Seq()) ++ - (if (deployMode == "cluster") Seq("--deploy-mode", "cluster") else Seq()) ++ - (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) ++ - (if (ca.common.verbose) Seq("--verbose") else Seq()) ++ - (if (ca.common.skipSanityCheck) Seq("--skip-sanity-check") else Seq()) ++ - (if (ca.common.stopAfterRead) Seq("--stop-after-read") else Seq()) ++ - (if (ca.common.stopAfterPrepare) { - Seq("--stop-after-prepare") - } else { - Seq() - }) ++ - ca.common.evaluation.map(x => Seq("--evaluation-class", x)). - getOrElse(Seq()) ++ - // If engineParamsGenerator is specified, it overrides the evaluation. - ca.common.engineParamsGenerator.orElse(ca.common.evaluation) - .map(x => Seq("--engine-params-generator-class", x)) - .getOrElse(Seq()) ++ - (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) ++ - Seq("--json-extractor", ca.common.jsonExtractor.toString) +object RunWorkflow extends Logging { - info(s"Submission command: ${sparkSubmit.mkString(" ")}") - Process(sparkSubmit, None, "CLASSPATH" -> "", "SPARK_YARN_USER_ENV" -> pioEnvVars).! - } + def runWorkflow( + wa: WorkflowArgs, + sa: SparkArgs, + em: EngineManifest, + pioHome: String, + verbose: Boolean = false): Expected[(Process, () => Unit)] = { - def newRunWorkflow(ca: ConsoleArgs, em: EngineManifest): Int = { val jarFiles = em.files.map(new URI(_)) val args = Seq( "--engine-id", @@ -181,35 +64,37 @@ object RunWorkflow extends Logging { "--engine-version", em.version, "--engine-variant", - ca.common.variantJson.toURI.toString, + wa.variantJson.toURI.toString, "--verbosity", - ca.common.verbosity.toString) ++ - ca.common.engineFactory.map( + wa.verbosity.toString) ++ + wa.engineFactory.map( x => Seq("--engine-factory", x)).getOrElse(Seq()) ++ - ca.common.engineParamsKey.map( + wa.engineParamsKey.map( x => Seq("--engine-params-key", x)).getOrElse(Seq()) ++ - (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) ++ - (if (ca.common.verbose) Seq("--verbose") else Seq()) ++ - (if (ca.common.skipSanityCheck) Seq("--skip-sanity-check") else Seq()) ++ - (if (ca.common.stopAfterRead) Seq("--stop-after-read") else Seq()) ++ - (if (ca.common.stopAfterPrepare) { + (if (wa.batch != "") Seq("--batch", wa.batch) else Seq()) ++ + (if (verbose) Seq("--verbose") else Seq()) ++ + (if (wa.skipSanityCheck) Seq("--skip-sanity-check") else Seq()) ++ + (if (wa.stopAfterRead) Seq("--stop-after-read") else Seq()) ++ + (if (wa.stopAfterPrepare) { Seq("--stop-after-prepare") } else { Seq() }) ++ - ca.common.evaluation.map(x => Seq("--evaluation-class", x)). + wa.evaluation.map(x => Seq("--evaluation-class", x)). getOrElse(Seq()) ++ // If engineParamsGenerator is specified, it overrides the evaluation. - ca.common.engineParamsGenerator.orElse(ca.common.evaluation) + wa.engineParamsGenerator.orElse(wa.evaluation) .map(x => Seq("--engine-params-generator-class", x)) .getOrElse(Seq()) ++ - (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) ++ - Seq("--json-extractor", ca.common.jsonExtractor.toString) + (if (wa.batch != "") Seq("--batch", wa.batch) else Seq()) ++ + Seq("--json-extractor", wa.jsonExtractor.toString) Runner.runOnSpark( "org.apache.predictionio.workflow.CreateWorkflow", args, - ca, - jarFiles) + sa, + jarFiles, + pioHome, + verbose) } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala b/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala index b3ec51c..d9752df 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala @@ -21,16 +21,22 @@ package org.apache.predictionio.tools import java.io.File import java.net.URI -import grizzled.slf4j.Logging import org.apache.predictionio.tools.console.ConsoleArgs import org.apache.predictionio.workflow.WorkflowUtils +import org.apache.predictionio.tools.ReturnTypes._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import scala.sys.process._ -object Runner extends Logging { +case class SparkArgs( + sparkHome: Option[String] = None, + sparkPassThrough: Seq[String] = Seq(), + sparkKryo: Boolean = false, + scratchUri: Option[URI] = None) + +object Runner extends EitherLogging { def envStringToMap(env: String): Map[String, String] = env.split(',').flatMap(p => p.split('=') match { @@ -95,26 +101,27 @@ object Runner extends Logging { def runOnSpark( className: String, classArgs: Seq[String], - ca: ConsoleArgs, - extraJars: Seq[URI]): Int = { + sa: SparkArgs, + extraJars: Seq[URI], + pioHome: String, + verbose: Boolean = false): Expected[(Process, () => Unit)] = { // Return error for unsupported cases val deployMode = - argumentValue(ca.common.sparkPassThrough, "--deploy-mode").getOrElse("client") + argumentValue(sa.sparkPassThrough, "--deploy-mode").getOrElse("client") val master = - argumentValue(ca.common.sparkPassThrough, "--master").getOrElse("local") + argumentValue(sa.sparkPassThrough, "--master").getOrElse("local") - (ca.common.scratchUri, deployMode, master) match { + (sa.scratchUri, deployMode, master) match { case (Some(u), "client", m) if m != "yarn-cluster" => - error("--scratch-uri cannot be set when deploy mode is client") - return 1 + return logAndFail("--scratch-uri cannot be set when deploy mode is client") case (_, "cluster", m) if m.startsWith("spark://") => - error("Using cluster deploy mode with Spark standalone cluster is not supported") - return 1 + return logAndFail( + "Using cluster deploy mode with Spark standalone cluster is not supported") case _ => Unit } // Initialize HDFS API for scratch URI - val fs = ca.common.scratchUri map { uri => + val fs = sa.scratchUri map { uri => FileSystem.get(uri, new Configuration()) } @@ -124,18 +131,18 @@ object Runner extends Logging { ).mkString(",") // Location of Spark - val sparkHome = ca.common.sparkHome.getOrElse( + val sparkHome = sa.sparkHome.getOrElse( sys.env.getOrElse("SPARK_HOME", ".")) // Local path to PredictionIO assembly JAR - val mainJar = handleScratchFile( - fs, - ca.common.scratchUri, - console.Console.coreAssembly(ca.common.pioHome.get)) + val mainJar = Common.coreAssembly(pioHome) fold( + errStr => return Left(errStr), + assembly => handleScratchFile(fs, sa.scratchUri, assembly) + ) // Extra JARs that are needed by the driver val driverClassPathPrefix = - argumentValue(ca.common.sparkPassThrough, "--driver-class-path") map { v => + argumentValue(sa.sparkPassThrough, "--driver-class-path") map { v => Seq(v) } getOrElse { Nil @@ -146,11 +153,11 @@ object Runner extends Logging { // Extra files that are needed to be passed to --files val extraFiles = WorkflowUtils.thirdPartyConfFiles map { f => - handleScratchFile(fs, ca.common.scratchUri, new File(f)) + handleScratchFile(fs, sa.scratchUri, new File(f)) } val deployedJars = extraJars map { j => - handleScratchFile(fs, ca.common.scratchUri, new File(j)) + handleScratchFile(fs, sa.scratchUri, new File(j)) } val sparkSubmitCommand = @@ -174,7 +181,7 @@ object Runner extends Logging { Nil } - val sparkSubmitKryo = if (ca.common.sparkKryo) { + val sparkSubmitKryo = if (sa.sparkKryo) { Seq( "--conf", "spark.serializer=org.apache.spark.serializer.KryoSerializer") @@ -182,33 +189,26 @@ object Runner extends Logging { Nil } - val verbose = if (ca.common.verbose) Seq("--verbose") else Nil + val verboseArg = if (verbose) Seq("--verbose") else Nil val sparkSubmit = Seq( sparkSubmitCommand, - ca.common.sparkPassThrough, + sa.sparkPassThrough, Seq("--class", className), sparkSubmitJars, sparkSubmitFiles, sparkSubmitExtraClasspaths, sparkSubmitKryo, Seq(mainJar), - detectFilePaths(fs, ca.common.scratchUri, classArgs), + detectFilePaths(fs, sa.scratchUri, classArgs), Seq("--env", pioEnvVars), - verbose).flatten.filter(_ != "") + verboseArg).flatten.filter(_ != "") info(s"Submission command: ${sparkSubmit.mkString(" ")}") val proc = Process( sparkSubmit, None, "CLASSPATH" -> "", "SPARK_YARN_USER_ENV" -> pioEnvVars).run() - Runtime.getRuntime.addShutdownHook(new Thread(new Runnable { - def run(): Unit = { - cleanup(fs, ca.common.scratchUri) - proc.destroy() - } - })) - cleanup(fs, ca.common.scratchUri) - proc.exitValue() + Right((proc, () => cleanup(fs, sa.scratchUri))) } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala b/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala index f53d84c..bbe39a5 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala @@ -133,7 +133,7 @@ case class AdminServerConfig( ) object AdminServer { - def createAdminServer(config: AdminServerConfig): Unit = { + def createAdminServer(config: AdminServerConfig): ActorSystem = { implicit val system = ActorSystem("AdminServerSystem") val commandClient = new CommandClient( @@ -146,7 +146,7 @@ object AdminServer { Props(classOf[AdminServerActor], commandClient), "AdminServerActor") serverActor ! StartServer(config.ip, config.port) - system.awaitTermination + system } } @@ -155,5 +155,6 @@ object AdminRun { AdminServer.createAdminServer(AdminServerConfig( ip = "localhost", port = 7071)) + .awaitTermination } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/commands/AccessKey.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/commands/AccessKey.scala b/tools/src/main/scala/org/apache/predictionio/tools/commands/AccessKey.scala new file mode 100644 index 0000000..715cf8f --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/AccessKey.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools.commands + +import org.apache.predictionio.data.storage +import org.apache.predictionio.tools +import org.apache.predictionio.tools.EitherLogging +import org.apache.predictionio.tools.ReturnTypes._ + +import grizzled.slf4j.Logging +import scala.util.Either + +object AccessKey extends EitherLogging { + + def create( + appName: String, + key: String, + events: Seq[String]): Expected[storage.AccessKey] = { + + val apps = storage.Storage.getMetaDataApps + apps.getByName(appName) map { app => + val accessKeys = storage.Storage.getMetaDataAccessKeys + val newKey = storage.AccessKey( + key = key, + appid = app.id, + events = events) + accessKeys.insert(newKey) map { k => + info(s"Created new access key: ${k}") + Right(newKey.copy(key = k)) + } getOrElse { + logAndFail(s"Unable to create new access key.") + } + } getOrElse { + logAndFail(s"App ${appName} does not exist. Aborting.") + } + } + + def list(app: Option[String]): Expected[Seq[storage.AccessKey]] = + app map { appName => + App.show(appName).right map { appChansPair => appChansPair._1.keys } + } getOrElse { + Right(storage.Storage.getMetaDataAccessKeys.getAll) + } + + def delete(key: String): MaybeError = { + try { + storage.Storage.getMetaDataAccessKeys.delete(key) + logAndSucceed(s"Deleted access key ${key}.") + } catch { + case e: Exception => + error(s"Error deleting access key ${key}.", e) + Left(s"Error deleting access key ${key}.") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/commands/App.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/commands/App.scala b/tools/src/main/scala/org/apache/predictionio/tools/commands/App.scala new file mode 100644 index 0000000..44fa667 --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/App.scala @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools.commands + +import org.apache.predictionio.data.storage +import org.apache.predictionio.data.storage.Channel +import org.apache.predictionio.tools.EitherLogging +import org.apache.predictionio.tools.ReturnTypes._ + +sealed case class AppDescription( + app: storage.App, + keys: Seq[storage.AccessKey]) + +object App extends EitherLogging { + + def create( + name: String, + id: Option[Int] = None, + description: Option[String] = None, + accessKey: String = "") : Expected[AppDescription] = { + + val apps = storage.Storage.getMetaDataApps() + // get the client in the beginning so error exit right away if can't access client + val events = storage.Storage.getLEvents() + var errStr = "" + + apps.getByName(name) map { app => + errStr = s"App ${name} already exists. Aborting." + error(errStr) + errStr + } orElse { + id.flatMap { id => + apps.get(id) map { app => + errStr = s"App ID ${id} already exists and maps to the app '${app.name}'. " + + "Aborting." + error(errStr) + errStr + } + } + } map {err => Left(err)} getOrElse { + val newApp = storage.App( + id = id.getOrElse(0), + name = name, + description = description) + val appid = apps.insert(newApp) + + appid map { id => + val dbInit = events.init(id) + val r = if (dbInit) { + info(s"Initialized Event Store for this app ID: ${id}.") + val accessKeys = storage.Storage.getMetaDataAccessKeys + val newKey = storage.AccessKey( + key = accessKey, + appid = id, + events = Seq()) + accessKeys.insert(newKey) + .map { k => + Right(AppDescription( + app = newApp.copy(id = id), + keys = Seq(newKey.copy(key = k)))) + } getOrElse { + logAndFail(s"Unable to create new access key.") + } + } else { + errStr = s"Unable to initialize Event Store for this app ID: ${id}." + try { + apps.delete(id) + } catch { + case e: Exception => + errStr += s""" + |Failed to revert back the App meta-data change. + |The app ${name} CANNOT be used! + |Please run 'pio app delete ${name}' to delete this app!""" + } + logAndFail(errStr) + } + events.close() + r + } getOrElse { + logAndFail(s"Unable to create new app.") + } + } + } + + def list: Seq[AppDescription] = { + val apps = storage.Storage.getMetaDataApps.getAll().sortBy(_.name) + val accessKeys = storage.Storage.getMetaDataAccessKeys + + apps map { app => + AppDescription( + app = app, + keys = accessKeys.getByAppid(app.id)) + } + } + + def show(appName: String): Expected[(AppDescription, Seq[Channel])] = { + val apps = storage.Storage.getMetaDataApps + val accessKeys = storage.Storage.getMetaDataAccessKeys + val channels = storage.Storage.getMetaDataChannels + + apps.getByName(appName) map { app => + Right( + (AppDescription( + app = app, + keys = accessKeys.getByAppid(app.id)), + channels.getByAppid(app.id)) + ) + } getOrElse { + logAndFail(s"App ${appName} does not exist. Aborting.") + } + } + + def delete(name: String): MaybeError = { + val events = storage.Storage.getLEvents() + try { + show(name).right.flatMap { case (appDesc: AppDescription, channels: Seq[Channel]) => + + val delChannelStatus: MaybeError = + channels.map { ch => + if (events.remove(appDesc.app.id, Some(ch.id))) { + info(s"Removed Event Store of the channel ID: ${ch.id}") + try { + storage.Storage.getMetaDataChannels.delete(ch.id) + info(s"Deleted channel ${ch.name}") + None + } catch { + case e: Exception => + val errStr = s"Error deleting channel ${ch.name}." + error(errStr, e) + Some(errStr) + } + } else { + val errStr = s"Error removing Event Store of the channel ID: ${ch.id}." + error(errStr) + Some(errStr) + } + } + .flatten + .reduceOption(_ + "\n" + _) + .map(Left(_)) getOrElse Success + + if (delChannelStatus.isLeft) { + return delChannelStatus + } + + try { + events.remove(appDesc.app.id) + info(s"Removed Event Store for this app ID: ${appDesc.app.id}") + } catch { + case e: Exception => + logAndFail(s"Error removing Event Store for this app. Aborting.") + } + + appDesc.keys foreach { key => + try { + storage.Storage.getMetaDataAccessKeys.delete(key.key) + info(s"Removed access key ${key.key}") + } catch { + case e: Exception => + logAndFail(s"Error removing access key ${key.key}. Aborting.") + } + } + + try { + storage.Storage.getMetaDataApps.delete(appDesc.app.id) + info(s"Deleted app ${appDesc.app.name}.") + } catch { + case e: Exception => + logAndFail(s"Error deleting app ${appDesc.app.name}. Aborting.") + } + logAndSucceed("Done.") + } + + } finally { + events.close() + } + } + + def dataDelete( + name: String, + channel: Option[String] = None, + all: Boolean = false): MaybeError = { + + var errStr = "" + val events = storage.Storage.getLEvents() + try { + show(name).right.flatMap { case (appDesc: AppDescription, channels: Seq[Channel]) => + + val chanIdsToRemove: Seq[Option[Int]] = + if (all) { + channels.map(ch => Some(ch.id)) :+ None // remove default channel too + } else { + channel.map { chName => + channels.find(ch => ch.name == chName) match { + case None => + return logAndFail(s"""Unable to delete data for channel. + |Channel ${chName} doesn't exist.""") + case Some(ch) => Seq(Some(ch.id)) + } + } getOrElse { + Seq(None) // for default channel + } + } + + chanIdsToRemove.map { chId: Option[Int] => + + val r1 = if (events.remove(appDesc.app.id, chId)) { + if (chId.isDefined) { + info(s"Removed Event Store for the channel ID: ${chId.get}") + } else { + info(s"Removed Event Store for the app ID: ${appDesc.app.id}") + } + None + } else { + errStr = + if (chId.isDefined) s"Error removing Event Store for the channel ID: ${chId.get}." + else s"Error removing Event Store for the app ID: ${appDesc.app.id}." + error(errStr) + Some(errStr) + } + // re-create table + val dbInit = events.init(appDesc.app.id, chId) + val r2 = if (dbInit) { + if (chId.isDefined) { + info(s"Initialized Event Store for the channel ID: ${chId.get}") + } else { + info(s"Initialized Event Store for the app ID: ${appDesc.app.id}") + } + None + } else { + errStr = + if (chId.isDefined) { + s"Unable to initialize Event Store for the channel ID: ${chId.get}." + } + else { + s"Unable to initialize Event tore for the app ID: ${appDesc.app.id}." + } + error(errStr) + Some(errStr) + } + Seq(r1, r2) + } + .flatten.flatten + .reduceOption(_ + "\n" + _) + .toLeft(Ok()) + } + } finally { + events.close() + } + } + + def channelNew(appName: String, newChannel: String): Expected[Channel] = { + val events = storage.Storage.getLEvents() + val chanStorage = storage.Storage.getMetaDataChannels + var errStr = "" + try { + show(appName).right flatMap { case (appDesc: AppDescription, channels: Seq[Channel]) => + if (channels.find(ch => ch.name == newChannel).isDefined) { + logAndFail(s"""Channel ${newChannel} already exists. + |Unable to create new channel.""") + } else if (!storage.Channel.isValidName(newChannel)) { + logAndFail(s"""Unable to create new channel. + |The channel name ${newChannel} is invalid. + |${storage.Channel.nameConstraint}""") + } else { + + val channel = Channel( + id = 0, + appid = appDesc.app.id, + name = newChannel) + + chanStorage.insert(channel) map { chanId => + + info(s"Updated Channel meta-data.") + + // initialize storage + val dbInit = events.init(appDesc.app.id, Some(chanId)) + if (dbInit) { + info(s"Initialized Event Store for the channel: ${newChannel}.") + info(s"Created new channel:") + info(s" Channel Name: ${newChannel}") + info(s" Channel ID: ${chanId}") + info(s" App ID: ${appDesc.app.id}") + Right(channel.copy(id = chanId)) + } else { + errStr = s"""Unable to create new channel. + |Failed to initalize Event Store.""" + error(errStr) + // reverted back the meta data + try { + chanStorage.delete(chanId) + Left(errStr) + } catch { + case e: Exception => + val nextErrStr = s""" + |Failed to revert back the Channel meta-data change. + |The channel ${newChannel} CANNOT be used! + |Please run 'pio app channel-delete ${appName} ${newChannel}'""" + + " to delete this channel!" + logAndFail(errStr + nextErrStr) + } + } + } getOrElse { + logAndFail(s"""Unable to create new channel. + |Failed to update Channel meta-data.""") + } + } + } + } finally { + events.close() + } + } + + def channelDelete(appName: String, deleteChannel: String): MaybeError = { + val chanStorage = storage.Storage.getMetaDataChannels + val events = storage.Storage.getLEvents() + var errStr = "" + try { + show(appName).right.flatMap { case (appDesc: AppDescription, channels: Seq[Channel]) => + val foundChannel = channels.find(ch => ch.name == deleteChannel) + if (foundChannel.isEmpty) { + logAndFail(s"""Unable to delete channel + |Channel ${deleteChannel} doesn't exists.""") + } else { + val chId = foundChannel.get.id + val dbRemoved = events.remove(appDesc.app.id, Some(chId)) + if (dbRemoved) { + info(s"Removed Event Store for this channel: ${deleteChannel}") + try { + chanStorage.delete(chId) + logAndSucceed(s"Deleted channel: ${deleteChannel}.") + } catch { + case e: Exception => + logAndFail(s"""Unable to delete channel. + |Failed to update Channel meta-data. + |The channel ${deleteChannel} CANNOT be used! + |Please run 'pio app channel-delete ${appDesc.app.name} ${deleteChannel}'""" + + " to delete this channel again!") + } + } else { + logAndFail(s"""Unable to delete channel. + |Error removing Event Store for this channel.""") + } + } + } + } finally { + events.close() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala b/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala new file mode 100644 index 0000000..6fd8977 --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/Engine.scala @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools.commands + +import org.apache.predictionio.core.BuildInfo +import org.apache.predictionio.controller.Utils +import org.apache.predictionio.data.storage +import org.apache.predictionio.data.storage.EngineManifest +import org.apache.predictionio.data.storage.EngineManifestSerializer +import org.apache.predictionio.tools.RegisterEngine +import org.apache.predictionio.tools.EitherLogging +import org.apache.predictionio.tools.{RunWorkflow, RunServer} +import org.apache.predictionio.tools.{DeployArgs, WorkflowArgs, SparkArgs, ServerArgs} +import org.apache.predictionio.tools.ReturnTypes._ +import org.apache.predictionio.tools.Common._ +import org.apache.predictionio.workflow.WorkflowUtils + +import org.apache.commons.io.FileUtils +import org.json4s.native.Serialization.read +import org.json4s._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write +import scala.io.Source +import scala.util.Random +import scala.collection.JavaConversions._ +import scala.sys.process._ +import scalaj.http.Http +import java.io.File + +case class BuildArgs( + sbt: Option[File] = None, + sbtExtra: Option[String] = None, + sbtAssemblyPackageDependency: Boolean = true, + sbtClean: Boolean = false, + uberJar: Boolean = false, + forceGeneratePIOSbt: Boolean = false) + +case class EngineArgs( + manifestJson: File = new File("manifest.json"), + engineId: Option[String] = None, + engineVersion: Option[String] = None) + +object Engine extends EitherLogging { + + private val manifestAutogenTag = "pio-autogen-manifest" + + private def readManifestJson(json: File): Expected[EngineManifest] = { + implicit val formats = Utils.json4sDefaultFormats + + new EngineManifestSerializer + try { + Right(read[EngineManifest](Source.fromFile(json).mkString)) + } catch { + case e: java.io.FileNotFoundException => + logAndFail(s"${json.getCanonicalPath} does not exist. Aborting.") + case e: MappingException => + logAndFail(s"${json.getCanonicalPath} has invalid content: " + + e.getMessage) + } + } + + private def withRegisteredManifest[T](ea: EngineArgs)( + op: EngineManifest => Expected[T]): Expected[T] = { + val res: Expected[Expected[T]] = for { + ej <- readManifestJson(ea.manifestJson).right + id <- Right(ea.engineId getOrElse ej.id).right + version <- Right(ea.engineVersion getOrElse ej.version).right + manifest <- storage.Storage.getMetaDataEngineManifests.get(id, version) + .toRight { + val errStr = + s"""Engine ${id} ${version} cannot be found in the system.") + |Possible reasons: + |- the engine is not yet built by the 'build' command; + |- the meta data store is offline.""" + error(errStr) + errStr + }.right + } yield { + op(manifest) + } + res.joinRight + } + + private def generateManifestJson(json: File): MaybeError = { + val cwd = sys.props("user.dir") + implicit val formats = Utils.json4sDefaultFormats + + new EngineManifestSerializer + val rand = Random.alphanumeric.take(32).mkString + val ha = java.security.MessageDigest.getInstance("SHA-1"). + digest(cwd.getBytes).map("%02x".format(_)).mkString + val em = EngineManifest( + id = rand, + version = ha, + name = new File(cwd).getName, + description = Some(manifestAutogenTag), + files = Seq(), + engineFactory = "") + try { + FileUtils.writeStringToFile(json, write(em), "ISO-8859-1") + Success + } catch { + case e: java.io.IOException => + logAndFail(s"Cannot generate ${json} automatically (${e.getMessage}). " + + "Aborting.") + } + } + + private def regenerateManifestJson(json: File): MaybeError = { + val cwd = sys.props("user.dir") + val ha = java.security.MessageDigest.getInstance("SHA-1"). + digest(cwd.getBytes).map("%02x".format(_)).mkString + if (json.exists) { + readManifestJson(json).right.flatMap { em => + if (em.description == Some(manifestAutogenTag) && ha != em.version) { + warn("This engine project directory contains an auto-generated " + + "manifest that has been copied/moved from another location. ") + warn("Regenerating the manifest to reflect the updated location. " + + "This will dissociate with all previous engine instances.") + generateManifestJson(json) + } else { + logAndSucceed(s"Using existing engine manifest JSON at " + + "${json.getCanonicalPath}") + } + } + } else { + generateManifestJson(json) + } + } + + private def detectSbt(sbt: Option[File], pioHome: String): String = { + sbt map { + _.getCanonicalPath + } getOrElse { + val f = new File(Seq(pioHome, "sbt", "sbt").mkString(File.separator)) + if (f.exists) f.getCanonicalPath else "sbt" + } + } + + private def outputSbtError(line: String): Unit = { + """\[.*error.*\]""".r findFirstIn line foreach { _ => error(line) } + } + + private def compile( + buildArgs: BuildArgs, pioHome: String, verbose: Boolean): MaybeError = { + // only add pioVersion to sbt if project/pio.sbt exists + if (new File("project", "pio-build.sbt").exists || buildArgs.forceGeneratePIOSbt) { + FileUtils.writeLines( + new File("pio.sbt"), + Seq( + "// Generated automatically by pio build.", + "// Changes in this file will be overridden.", + "", + "pioVersion := \"" + BuildInfo.version + "\"")) + } + implicit val formats = Utils.json4sDefaultFormats + + val sbt = detectSbt(buildArgs.sbt, pioHome) + info(s"Using command '${sbt}' at the current working directory to build.") + info("If the path above is incorrect, this process will fail.") + val asm = + if (buildArgs.sbtAssemblyPackageDependency) { + " assemblyPackageDependency" + } else { + "" + } + val clean = if (buildArgs.sbtClean) " clean" else "" + val buildCmd = s"${sbt} ${buildArgs.sbtExtra.getOrElse("")}${clean} " + + (if (buildArgs.uberJar) "assembly" else s"package${asm}") + val core = new File(s"pio-assembly-${BuildInfo.version}.jar") + if (buildArgs.uberJar) { + info(s"Uber JAR enabled. Putting ${core.getName} in lib.") + val dst = new File("lib") + dst.mkdir() + coreAssembly(pioHome) match { + case Right(coreFile) => + FileUtils.copyFileToDirectory( + coreFile, + dst, + true) + case Left(errStr) => return Left(errStr) + } + } else { + if (new File("engine.json").exists()) { + info(s"Uber JAR disabled. Making sure lib/${core.getName} is absent.") + new File("lib", core.getName).delete() + } else { + info("Uber JAR disabled, but current working directory does not look " + + s"like an engine project directory. Please delete lib/${core.getName} manually.") + } + } + info(s"Going to run: ${buildCmd}") + try { + val r = + if (verbose) { + buildCmd.!(ProcessLogger(line => info(line), line => error(line))) + } else { + buildCmd.!(ProcessLogger( + line => outputSbtError(line), + line => outputSbtError(line))) + } + if (r != 0) { + logAndFail(s"Return code of build command: ${buildCmd} is ${r}. Aborting.") + } else { + logAndSucceed("Compilation finished successfully.") + } + } catch { + case e: java.io.IOException => + logAndFail(s"Exception during compilation: ${e.getMessage}") + } + } + + def build( + buildArgs: BuildArgs, + pioHome: String, + manifestJson: File, + verbose: Boolean): MaybeError = { + + regenerateManifestJson(manifestJson) match { + case Left(err) => return Left(err) + case _ => Unit + } + + Template.verifyTemplateMinVersion(new File("template.json")) match { + case Left(err) => return Left(err) + case Right(_) => + compile(buildArgs, pioHome, verbose) + info("Looking for an engine...") + val jarFiles = jarFilesForScala + if (jarFiles.isEmpty) { + return logAndFail("No engine found. Your build might have failed. Aborting.") + } + jarFiles foreach { f => info(s"Found ${f.getName}")} + RegisterEngine.registerEngine( + manifestJson, + jarFiles, + false) + } + } + + /** Training an engine. + * The function starts a training process to bu run concurrenlty. + * + * @param ea An instance of [[EngineArgs]] + * @param wa An instance of [[WorkflowArgs]] for running a single training. + * @param sa An instance of [[SparkArgs]] + * @param pioHome [[String]] with a path to PIO installation + * @param verbose A [[Boolean]] + * @return An instance of [[Expected]] contaning either [[Left]] + * with an error message or [[Right]] with a handle to a process + * responsible for training and a function () => Unit, + * that must be called when the process is complete + */ + def train( + ea: EngineArgs, + wa: WorkflowArgs, + sa: SparkArgs, + pioHome: String, + verbose: Boolean = false): Expected[(Process, () => Unit)] = { + + regenerateManifestJson(ea.manifestJson) match { + case Left(err) => return Left(err) + case _ => Unit + } + + Template.verifyTemplateMinVersion(new File("template.json")).right.flatMap { + _ => + withRegisteredManifest(ea) { em => + RunWorkflow.runWorkflow(wa, sa, em, pioHome, verbose) + } + } + } + + /** Deploying an engine. + * The function starts a new process to be run concerrently. + * + * @param ea An instance of [[EngineArgs]] + * @param engineInstanceId An instance of [[engineInstanceId]] + * @param serverArgs An instance of [[ServerArgs]] + * @param sparkArgs An instance of [[SparkArgs]] + * @param pioHome [[String]] with a path to PIO installation + * @param verbose A [[Boolean]] + * @return An instance of [[Expected]] contaning either [[Left]] + * with an error message or [[Right]] with a handle to process + * of a running angine and a function () => Unit, + * that must be called when the process is complete + */ + def deploy( + ea: EngineArgs, + engineInstanceId: Option[String], + serverArgs: ServerArgs, + sparkArgs: SparkArgs, + pioHome: String, + verbose: Boolean = false): Expected[(Process, () => Unit)] = { + + val verifyResult = Template.verifyTemplateMinVersion(new File("template.json")) + if (verifyResult.isLeft) { + return Left(verifyResult.left.get) + } + withRegisteredManifest(ea) { em => + val variantJson = parse(Source.fromFile(serverArgs.variantJson).mkString) + val variantId = variantJson \ "id" match { + case JString(s) => s + case _ => + return logAndFail("Unable to read engine variant ID from " + + s"${serverArgs.variantJson.getCanonicalPath}. Aborting.") + } + val engineInstances = storage.Storage.getMetaDataEngineInstances + val engineInstance = engineInstanceId map { eid => + engineInstances.get(eid) + } getOrElse { + engineInstances.getLatestCompleted(em.id, em.version, variantId) + } + engineInstance map { r => + RunServer.runServer(r.id, serverArgs, sparkArgs, em, pioHome, verbose) + } getOrElse { + engineInstanceId map { eid => + logAndFail(s"Invalid engine instance ID ${eid}. Aborting.") + } getOrElse { + logAndFail(s"No valid engine instance found for engine ${em.id} " + + s"${em.version}.\nTry running 'train' before 'deploy'. Aborting.") + } + } + } + } + + def undeploy(da: DeployArgs): MaybeError = { + + val serverUrl = s"http://${da.ip}:${da.port}" + info( + s"Undeploying any existing engine instance at ${serverUrl}") + try { + val code = Http(s"${serverUrl}/stop").asString.code + code match { + case 200 => Success + case 404 => + logAndFail(s"Another process is using ${serverUrl}. Unable to undeploy.") + case _ => + logAndFail(s"Another process is using ${serverUrl}, or an existing " + + s"engine server is not responding properly (HTTP ${code}). " + + "Unable to undeploy.") + } + } catch { + case e: java.net.ConnectException => + logAndFail(s"Nothing at ${serverUrl}") + case _: Throwable => + logAndFail("Another process might be occupying " + + s"${da.ip}:${da.port}. Unable to undeploy.") + } + } + + /** Running a driver on spark. + * The function starts a process and returns immediately + * + * @param mainClass A [[String]] with the class containing a main functionto run + * @param driverArguments Arguments to be passed to the main function + * @param manifestJson An instance of [[File]] for running a single training. + * @param buildArgs An instance of [[BuildArgs]] + * @param sparkArgs an instance of [[SparkArgs]] + * @param pioHome [[String]] with a path to PIO installation + * @param verbose A [[Boolean]] + * @return An instance of [[Expected]] contaning either [[Left]] + * with an error message or [[Right]] with a handle to a process + * of a running driver + */ + def run( + mainClass: String, + driverArguments: Seq[String], + manifestJson: File, + buildArgs: BuildArgs, + sparkArgs: SparkArgs, + pioHome: String, + verbose: Boolean): Expected[Process] = { + + generateManifestJson(manifestJson) match { + case Left(err) => return Left(err) + case _ => Unit + } + + compile(buildArgs, pioHome, verbose) + + val extraFiles = WorkflowUtils.thirdPartyConfFiles + + val jarFiles = jarFilesForScala + jarFiles foreach { f => info(s"Found JAR: ${f.getName}") } + val allJarFiles = jarFiles.map(_.getCanonicalPath) + val cmd = s"${getSparkHome(sparkArgs.sparkHome)}/bin/spark-submit --jars " + + s"${allJarFiles.mkString(",")} " + + (if (extraFiles.size > 0) { + s"--files ${extraFiles.mkString(",")} " + } else { + "" + }) + + "--class " + + s"${mainClass} ${sparkArgs.sparkPassThrough.mkString(" ")} " + + coreAssembly(pioHome) + " " + + driverArguments.mkString(" ") + info(s"Submission command: ${cmd}") + Right(Process( + cmd, + None, + "SPARK_YARN_USER_ENV" -> sys.env.filter(kv => kv._1.startsWith("PIO_")). + map(kv => s"${kv._1}=${kv._2}").mkString(",")).run()) + } + + def unregister(jsonManifest: File): MaybeError = { + RegisterEngine.unregisterEngine(jsonManifest) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/commands/Export.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/commands/Export.scala b/tools/src/main/scala/org/apache/predictionio/tools/commands/Export.scala new file mode 100644 index 0000000..ed6b487 --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/Export.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.tools.commands + +import org.apache.predictionio.tools.Runner +import org.apache.predictionio.tools.SparkArgs +import org.apache.predictionio.tools.ReturnTypes._ + +import scala.sys.process._ + +case class ExportArgs( + appId: Int = 0, + channel: Option[String] = None, + outputPath: String = "", + format: String = "json") + +object Export { + def eventsToFile( + ea: ExportArgs, + sa: SparkArgs, + pioHome: String): Expected[(Process, () => Unit)] = { + + val channelArg = ea.channel + .map(ch => Seq("--channel", ch)).getOrElse(Nil) + Runner.runOnSpark( + "org.apache.predictionio.tools.export.EventsToFile", + Seq( + "--appid", + ea.appId.toString, + "--output", + ea.outputPath, + "--format", + ea.format) ++ channelArg, + sa, + Nil, + pioHome) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/commands/Import.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/commands/Import.scala b/tools/src/main/scala/org/apache/predictionio/tools/commands/Import.scala new file mode 100644 index 0000000..9fac559 --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/Import.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.predictionio.tools.commands + +import org.apache.predictionio.tools.Runner +import org.apache.predictionio.tools.SparkArgs +import org.apache.predictionio.tools.ReturnTypes._ + +import scala.sys.process._ + +case class ImportArgs( + appId: Int = 0, + channel: Option[String] = None, + inputPath: String = "") + +object Import { + def fileToEvents( + ia: ImportArgs, + sa: SparkArgs, + pioHome: String): Expected[(Process, () => Unit)] = { + + val channelArg = ia.channel + .map(ch => Seq("--channel", ch)).getOrElse(Nil) + Runner.runOnSpark( + "org.apache.predictionio.tools.imprt.FileToEvents", + Seq( + "--appid", + ia.appId.toString, + "--input", + ia.inputPath) ++ channelArg, + sa, + Nil, + pioHome) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala b/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala new file mode 100644 index 0000000..10aca41 --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools.commands + +import org.apache.predictionio.core.BuildInfo +import org.apache.predictionio.data.storage +import org.apache.predictionio.data.api.EventServer +import org.apache.predictionio.data.api.EventServerConfig +import org.apache.predictionio.tools.EventServerArgs +import org.apache.predictionio.tools.EitherLogging +import org.apache.predictionio.tools.Common +import org.apache.predictionio.tools.ReturnTypes._ +import org.apache.predictionio.tools.dashboard.Dashboard +import org.apache.predictionio.tools.dashboard.DashboardConfig +import org.apache.predictionio.tools.admin.AdminServer +import org.apache.predictionio.tools.admin.AdminServerConfig + +import akka.actor.ActorSystem +import java.io.File +import scala.io.Source +import semverfi._ + +case class DashboardArgs( + ip: String = "127.0.0.1", + port: Int = 9000) + +case class AdminServerArgs( + ip: String = "127.0.0.1", + port: Int = 7071) + +case class PioStatus( + version: String = "", + pioHome: String = "", + sparkHome: String = "", + sparkVersion: String = "", + sparkMinVersion: String = "", + warnings: Seq[String] = Seq()) + +object Management extends EitherLogging { + + def version(): String = BuildInfo.version + + /** Starts a dashboard server and returns immediately + * + * @param da An instance of [[DashboardArgs]] + * @return An instance of [[ActorSystem]] in which the server is being executed + */ + def dashboard(da: DashboardArgs): ActorSystem = { + info(s"Creating dashboard at ${da.ip}:${da.port}") + Dashboard.createDashboard(DashboardConfig( + ip = da.ip, + port = da.port)) + } + + /** Starts an eventserver server and returns immediately + * + * @param ea An instance of [[EventServerArgs]] + * @return An instance of [[ActorSystem]] in which the server is being executed + */ + def eventserver(ea: EventServerArgs): ActorSystem = { + info(s"Creating Event Server at ${ea.ip}:${ea.port}") + EventServer.createEventServer(EventServerConfig( + ip = ea.ip, + port = ea.port, + stats = ea.stats)) + } + + /** Starts an adminserver server and returns immediately + * + * @param aa An instance of [[AdminServerArgs]] + * @return An instance of [[ActorSystem]] in which the server is being executed + */ + def adminserver(aa: AdminServerArgs): ActorSystem = { + info(s"Creating Admin Server at ${aa.ip}:${aa.port}") + AdminServer.createAdminServer(AdminServerConfig( + ip = aa.ip, + port = aa.port + )) + } + + private def stripMarginAndNewlines(string: String): String = + string.stripMargin.replaceAll("\n", " ") + + def status(pioHome: Option[String], sparkHome: Option[String]): Expected[PioStatus] = { + var pioStatus = PioStatus() + info("Inspecting PredictionIO...") + pioHome map { pioHome => + info(s"PredictionIO ${BuildInfo.version} is installed at $pioHome") + pioStatus = pioStatus.copy(version = version(), pioHome = pioHome) + } getOrElse { + return logAndFail("Unable to locate PredictionIO installation. Aborting.") + } + info("Inspecting Apache Spark...") + val sparkHomePath = Common.getSparkHome(sparkHome) + if (new File(s"$sparkHomePath/bin/spark-submit").exists) { + info(s"Apache Spark is installed at $sparkHome") + val sparkMinVersion = "1.3.0" + pioStatus = pioStatus.copy( + sparkHome = sparkHomePath, + sparkMinVersion = sparkMinVersion) + val sparkReleaseFile = new File(s"$sparkHomePath/RELEASE") + if (sparkReleaseFile.exists) { + val sparkReleaseStrings = + Source.fromFile(sparkReleaseFile).mkString.split(' ') + if (sparkReleaseStrings.length < 2) { + val warning = (stripMarginAndNewlines( + s"""|Apache Spark version information cannot be found (RELEASE file + |is empty). This is a known issue for certain vendors (e.g. + |Cloudera). Please make sure you are using a version of at least + |$sparkMinVersion.""")) + warn(warning) + pioStatus = pioStatus.copy(warnings = pioStatus.warnings :+ warning) + } else { + val sparkReleaseVersion = sparkReleaseStrings(1) + val parsedMinVersion = Version.apply(sparkMinVersion) + val parsedCurrentVersion = Version.apply(sparkReleaseVersion) + if (parsedCurrentVersion >= parsedMinVersion) { + info(stripMarginAndNewlines( + s"""|Apache Spark $sparkReleaseVersion detected (meets minimum + |requirement of $sparkMinVersion)""")) + pioStatus = pioStatus.copy(sparkVersion = sparkReleaseVersion) + } else { + return logAndFail(stripMarginAndNewlines( + s"""|Apache Spark $sparkReleaseVersion detected (does not meet + |minimum requirement. Aborting.""")) + } + } + } else { + val warning = (stripMarginAndNewlines( + s"""|Apache Spark version information cannot be found. If you are + |using a developmental tree, please make sure you are using a + |version of at least $sparkMinVersion.""")) + warn(warning) + pioStatus = pioStatus.copy(warnings = pioStatus.warnings :+ warning) + } + } else { + return logAndFail("Unable to locate a proper Apache Spark installation. Aborting.") + } + info("Inspecting storage backend connections...") + try { + storage.Storage.verifyAllDataObjects() + } catch { + case e: Throwable => + val errStr = s"""Unable to connect to all storage backends successfully. + |The following shows the error message from the storage backend. + |${e.getMessage} (${e.getClass.getName})", e) + |Dumping configuration of initialized storage backend sources. + |"Please make sure they are correct. + |""" + val sources = storage.Storage.config.get("sources") map { src => + src map { case (s, p) => + s"Source Name: $s; Type: ${p.getOrElse("type", "(error)")}; " + + s"Configuration: ${p.getOrElse("config", "(error)")}" + } mkString("\n") + } getOrElse { + "No properly configured storage backend sources." + } + return logAndFail(errStr + sources) + } + info("Your system is all ready to go.") + Right(pioStatus) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/commands/Template.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/commands/Template.scala b/tools/src/main/scala/org/apache/predictionio/tools/commands/Template.scala new file mode 100644 index 0000000..1476598 --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/Template.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools.commands + +import java.io.File + +import scala.io.Source + +import grizzled.slf4j.Logging +import org.apache.predictionio.core.BuildInfo +import org.apache.predictionio.tools.EitherLogging +import org.apache.predictionio.tools.ReturnTypes._ +import org.json4s._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.{write, read} +import semverfi._ + +case class TemplateMetaData( + pioVersionMin: Option[String] = None) + +object Template extends EitherLogging { + + def templateMetaData(templateJson: File): TemplateMetaData = { + if (!templateJson.exists) { + warn(s"$templateJson does not exist. Template metadata will not be available. " + + "(This is safe to ignore if you are not working on a template.)") + TemplateMetaData() + } else { + val jsonString = Source.fromFile(templateJson)(scala.io.Codec.ISO8859).mkString + val json = try { + parse(jsonString) + } catch { + case e: org.json4s.ParserUtil.ParseException => + warn(s"$templateJson cannot be parsed. Template metadata will not be available.") + return TemplateMetaData() + } + val pioVersionMin = json \ "pio" \ "version" \ "min" + pioVersionMin match { + case JString(s) => TemplateMetaData(pioVersionMin = Some(s)) + case _ => TemplateMetaData() + } + } + } + + def verifyTemplateMinVersion(templateJsonFile: File): MaybeError = { + val metadata = templateMetaData(templateJsonFile) + + for (pvm <- metadata.pioVersionMin) { + if (Version(BuildInfo.version) < Version(pvm)) { + return logAndFail(s"This engine template requires at least PredictionIO $pvm. " + + s"The template may not work with PredictionIO ${BuildInfo.version}.") + } + } + Success + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/e4a3c0c9/tools/src/main/scala/org/apache/predictionio/tools/console/AccessKey.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/AccessKey.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/AccessKey.scala deleted file mode 100644 index fcb9608..0000000 --- a/tools/src/main/scala/org/apache/predictionio/tools/console/AccessKey.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.tools.console - -import org.apache.predictionio.data.storage - -import grizzled.slf4j.Logging - -case class AccessKeyArgs( - accessKey: String = "", - events: Seq[String] = Seq()) - -object AccessKey extends Logging { - def create(ca: ConsoleArgs): Int = { - val apps = storage.Storage.getMetaDataApps - apps.getByName(ca.app.name) map { app => - val accessKeys = storage.Storage.getMetaDataAccessKeys - val accessKey = accessKeys.insert(storage.AccessKey( - key = ca.accessKey.accessKey, - appid = app.id, - events = ca.accessKey.events)) - accessKey map { k => - info(s"Created new access key: ${k}") - 0 - } getOrElse { - error(s"Unable to create new access key.") - 1 - } - } getOrElse { - error(s"App ${ca.app.name} does not exist. Aborting.") - 1 - } - } - - def list(ca: ConsoleArgs): Int = { - val keys = - if (ca.app.name == "") { - storage.Storage.getMetaDataAccessKeys.getAll - } else { - val apps = storage.Storage.getMetaDataApps - apps.getByName(ca.app.name) map { app => - storage.Storage.getMetaDataAccessKeys.getByAppid(app.id) - } getOrElse { - error(s"App ${ca.app.name} does not exist. Aborting.") - return 1 - } - } - val title = "Access Key(s)" - info(f"$title%64s | App ID | Allowed Event(s)") - keys.sortBy(k => k.appid) foreach { k => - val events = - if (k.events.size > 0) k.events.sorted.mkString(",") else "(all)" - info(f"${k.key}%64s | ${k.appid}%6d | $events%s") - } - info(s"Finished listing ${keys.size} access key(s).") - 0 - } - - def delete(ca: ConsoleArgs): Int = { - try { - storage.Storage.getMetaDataAccessKeys.delete(ca.accessKey.accessKey) - info(s"Deleted access key ${ca.accessKey.accessKey}.") - 0 - } catch { - case e: Exception => - error(s"Error deleting access key ${ca.accessKey.accessKey}.", e) - 1 - } - } -}
