http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/io/prediction/tools/dashboard/CorsSupport.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/io/prediction/tools/dashboard/CorsSupport.scala b/tools/src/main/scala/io/prediction/tools/dashboard/CorsSupport.scala deleted file mode 100644 index 3d2c888..0000000 --- a/tools/src/main/scala/io/prediction/tools/dashboard/CorsSupport.scala +++ /dev/null @@ -1,75 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.tools.dashboard - -// Reference from: https://gist.github.com/waymost/4b5598523c2c7361abea - -import spray.http.{HttpMethods, HttpMethod, HttpResponse, AllOrigins} -import spray.http.HttpHeaders._ -import spray.http.HttpMethods._ -import spray.http.HttpEntity -import spray.routing._ -import spray.http.StatusCodes -import spray.http.ContentTypes - -// see also https://developer.mozilla.org/en-US/docs/Web/HTTP/Access_control_CORS -trait CORSSupport { - this: HttpService => - - private val allowOriginHeader = `Access-Control-Allow-Origin`(AllOrigins) - private val optionsCorsHeaders = List( - `Access-Control-Allow-Headers`("""Origin, - |X-Requested-With, - |Content-Type, - |Accept, - |Accept-Encoding, - |Accept-Language, - |Host, - |Referer, - |User-Agent""".stripMargin.replace("\n", " ")), - `Access-Control-Max-Age`(1728000) - ) - - def cors[T]: Directive0 = mapRequestContext { ctx => - ctx.withRouteResponseHandling { - // OPTION request for a resource that responds to other methods - case Rejected(x) if (ctx.request.method.equals(HttpMethods.OPTIONS) && - x.exists(_.isInstanceOf[MethodRejection])) => { - val allowedMethods: List[HttpMethod] = x.collect { - case rejection: MethodRejection => rejection.supported - } - ctx.complete { - HttpResponse().withHeaders( - `Access-Control-Allow-Methods`(HttpMethods.OPTIONS, allowedMethods :_*) :: - allowOriginHeader :: - optionsCorsHeaders - ) - } - } - }.withHttpResponseHeadersMapped { headers => - allowOriginHeader :: headers - } - } - - override def timeoutRoute: StandardRoute = complete { - HttpResponse( - StatusCodes.InternalServerError, - HttpEntity(ContentTypes.`text/plain(UTF-8)`, - "The server was not able to produce a timely response to your request."), - List(allowOriginHeader) - ) - } -}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/io/prediction/tools/dashboard/Dashboard.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/io/prediction/tools/dashboard/Dashboard.scala b/tools/src/main/scala/io/prediction/tools/dashboard/Dashboard.scala deleted file mode 100644 index 154ba4e..0000000 --- a/tools/src/main/scala/io/prediction/tools/dashboard/Dashboard.scala +++ /dev/null @@ -1,156 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.tools.dashboard - -import com.typesafe.config.ConfigFactory -import io.prediction.authentication.KeyAuthentication -import io.prediction.configuration.SSLConfiguration -import io.prediction.data.storage.Storage -import spray.can.server.ServerSettings -import spray.routing.directives.AuthMagnet -import scala.concurrent.{Future, ExecutionContext} -import akka.actor.{ActorContext, Actor, ActorSystem, Props} -import akka.io.IO -import akka.pattern.ask -import akka.util.Timeout -import com.github.nscala_time.time.Imports.DateTime -import grizzled.slf4j.Logging -import spray.can.Http -import spray.http._ -import spray.http.MediaTypes._ -import spray.routing._ -import spray.routing.authentication.{Authentication, UserPass, BasicAuth} - -import scala.concurrent.duration._ - -case class DashboardConfig( - ip: String = "localhost", - port: Int = 9000) - -object Dashboard extends Logging with SSLConfiguration{ - def main(args: Array[String]): Unit = { - val parser = new scopt.OptionParser[DashboardConfig]("Dashboard") { - opt[String]("ip") action { (x, c) => - c.copy(ip = x) - } text("IP to bind to (default: localhost).") - opt[Int]("port") action { (x, c) => - c.copy(port = x) - } text("Port to bind to (default: 9000).") - } - - parser.parse(args, DashboardConfig()) map { dc => - createDashboard(dc) - } - } - - def createDashboard(dc: DashboardConfig): Unit = { - implicit val system = ActorSystem("pio-dashboard") - val service = - system.actorOf(Props(classOf[DashboardActor], dc), "dashboard") - implicit val timeout = Timeout(5.seconds) - val settings = ServerSettings(system) - IO(Http) ? Http.Bind( - service, - interface = dc.ip, - port = dc.port, - settings = Some(settings.copy(sslEncryption = true))) - system.awaitTermination - } -} - -class DashboardActor( - val dc: DashboardConfig) - extends Actor with DashboardService { - def actorRefFactory: ActorContext = context - def receive: Actor.Receive = runRoute(dashboardRoute) -} - -trait DashboardService extends HttpService with KeyAuthentication with CORSSupport { - - implicit def executionContext: ExecutionContext = actorRefFactory.dispatcher - val dc: DashboardConfig - val evaluationInstances = Storage.getMetaDataEvaluationInstances - val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_")) - val serverStartTime = DateTime.now - val dashboardRoute = - path("") { - authenticate(withAccessKeyFromFile) { request => - get { - respondWithMediaType(`text/html`) { - complete { - val completedInstances = evaluationInstances.getCompleted - html.index( - dc, - serverStartTime, - pioEnvVars, - completedInstances).toString - } - } - } - } - } ~ - pathPrefix("engine_instances" / Segment) { instanceId => - path("evaluator_results.txt") { - get { - respondWithMediaType(`text/plain`) { - evaluationInstances.get(instanceId).map { i => - complete(i.evaluatorResults) - } getOrElse { - complete(StatusCodes.NotFound) - } - } - } - } ~ - path("evaluator_results.html") { - get { - respondWithMediaType(`text/html`) { - evaluationInstances.get(instanceId).map { i => - complete(i.evaluatorResultsHTML) - } getOrElse { - complete(StatusCodes.NotFound) - } - } - } - } ~ - path("evaluator_results.json") { - get { - respondWithMediaType(`application/json`) { - evaluationInstances.get(instanceId).map { i => - complete(i.evaluatorResultsJSON) - } getOrElse { - complete(StatusCodes.NotFound) - } - } - } - } ~ - cors { - path("local_evaluator_results.json") { - get { - respondWithMediaType(`application/json`) { - evaluationInstances.get(instanceId).map { i => - complete(i.evaluatorResultsJSON) - } getOrElse { - complete(StatusCodes.NotFound) - } - } - } - } - } - } ~ - pathPrefix("assets") { - getFromResourceDirectory("assets") - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/io/prediction/tools/export/EventsToFile.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/io/prediction/tools/export/EventsToFile.scala b/tools/src/main/scala/io/prediction/tools/export/EventsToFile.scala deleted file mode 100644 index 743d57a..0000000 --- a/tools/src/main/scala/io/prediction/tools/export/EventsToFile.scala +++ /dev/null @@ -1,104 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.tools.export - -import io.prediction.controller.Utils -import io.prediction.data.storage.EventJson4sSupport -import io.prediction.data.storage.Storage -import io.prediction.tools.Runner -import io.prediction.workflow.WorkflowContext -import io.prediction.workflow.WorkflowUtils - -import grizzled.slf4j.Logging -import org.apache.spark.sql.SQLContext -import org.json4s.native.Serialization._ - -case class EventsToFileArgs( - env: String = "", - logFile: String = "", - appId: Int = 0, - channel: Option[String] = None, - outputPath: String = "", - format: String = "parquet", - verbose: Boolean = false, - debug: Boolean = false) - -object EventsToFile extends Logging { - def main(args: Array[String]): Unit = { - val parser = new scopt.OptionParser[EventsToFileArgs]("EventsToFile") { - opt[String]("env") action { (x, c) => - c.copy(env = x) - } - opt[String]("log-file") action { (x, c) => - c.copy(logFile = x) - } - opt[Int]("appid") action { (x, c) => - c.copy(appId = x) - } - opt[String]("channel") action { (x, c) => - c.copy(channel = Some(x)) - } - opt[String]("format") action { (x, c) => - c.copy(format = x) - } - opt[String]("output") action { (x, c) => - c.copy(outputPath = x) - } - opt[Unit]("verbose") action { (x, c) => - c.copy(verbose = true) - } - opt[Unit]("debug") action { (x, c) => - c.copy(debug = true) - } - } - parser.parse(args, EventsToFileArgs()) map { args => - // get channelId - val channels = Storage.getMetaDataChannels - val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap - - val channelId: Option[Int] = args.channel.map { ch => - if (!channelMap.contains(ch)) { - error(s"Channel ${ch} doesn't exist in this app.") - sys.exit(1) - } - - channelMap(ch) - } - - val channelStr = args.channel.map(n => " Channel " + n).getOrElse("") - - WorkflowUtils.modifyLogging(verbose = args.verbose) - @transient lazy implicit val formats = Utils.json4sDefaultFormats + - new EventJson4sSupport.APISerializer - val sc = WorkflowContext( - mode = "Export", - batch = "App ID " + args.appId + channelStr, - executorEnv = Runner.envStringToMap(args.env)) - val sqlContext = new SQLContext(sc) - val events = Storage.getPEvents() - val eventsRdd = events.find(appId = args.appId, channelId = channelId)(sc) - val jsonStringRdd = eventsRdd.map(write(_)) - if (args.format == "json") { - jsonStringRdd.saveAsTextFile(args.outputPath) - } else { - val jsonRdd = sqlContext.jsonRDD(jsonStringRdd) - jsonRdd.saveAsParquetFile(args.outputPath) - } - info(s"Events are exported to ${args.outputPath}/.") - info("Done.") - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/io/prediction/tools/imprt/FileToEvents.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/io/prediction/tools/imprt/FileToEvents.scala b/tools/src/main/scala/io/prediction/tools/imprt/FileToEvents.scala deleted file mode 100644 index 9a19a33..0000000 --- a/tools/src/main/scala/io/prediction/tools/imprt/FileToEvents.scala +++ /dev/null @@ -1,103 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.tools.imprt - -import io.prediction.controller.Utils -import io.prediction.data.storage.Event -import io.prediction.data.storage.EventJson4sSupport -import io.prediction.data.storage.Storage -import io.prediction.tools.Runner -import io.prediction.workflow.WorkflowContext -import io.prediction.workflow.WorkflowUtils - -import grizzled.slf4j.Logging -import org.json4s.native.Serialization._ - -import scala.util.{Failure, Try} - -case class FileToEventsArgs( - env: String = "", - logFile: String = "", - appId: Int = 0, - channel: Option[String] = None, - inputPath: String = "", - verbose: Boolean = false, - debug: Boolean = false) - -object FileToEvents extends Logging { - def main(args: Array[String]): Unit = { - val parser = new scopt.OptionParser[FileToEventsArgs]("FileToEvents") { - opt[String]("env") action { (x, c) => - c.copy(env = x) - } - opt[String]("log-file") action { (x, c) => - c.copy(logFile = x) - } - opt[Int]("appid") action { (x, c) => - c.copy(appId = x) - } - opt[String]("channel") action { (x, c) => - c.copy(channel = Some(x)) - } - opt[String]("input") action { (x, c) => - c.copy(inputPath = x) - } - opt[Unit]("verbose") action { (x, c) => - c.copy(verbose = true) - } - opt[Unit]("debug") action { (x, c) => - c.copy(debug = true) - } - } - parser.parse(args, FileToEventsArgs()) map { args => - // get channelId - val channels = Storage.getMetaDataChannels - val channelMap = channels.getByAppid(args.appId).map(c => (c.name, c.id)).toMap - - val channelId: Option[Int] = args.channel.map { ch => - if (!channelMap.contains(ch)) { - error(s"Channel ${ch} doesn't exist in this app.") - sys.exit(1) - } - - channelMap(ch) - } - - val channelStr = args.channel.map(n => " Channel " + n).getOrElse("") - - WorkflowUtils.modifyLogging(verbose = args.verbose) - @transient lazy implicit val formats = Utils.json4sDefaultFormats + - new EventJson4sSupport.APISerializer - val sc = WorkflowContext( - mode = "Import", - batch = "App ID " + args.appId + channelStr, - executorEnv = Runner.envStringToMap(args.env)) - val rdd = sc.textFile(args.inputPath).filter(_.trim.nonEmpty).map { json => - Try(read[Event](json)).recoverWith { - case e: Throwable => - error(s"\nmalformed json => $json") - Failure(e) - }.get - } - val events = Storage.getPEvents() - events.write(events = rdd, - appId = args.appId, - channelId = channelId)(sc) - info("Events are imported.") - info("Done.") - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala b/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala new file mode 100644 index 0000000..1640d55 --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/RegisterEngine.scala @@ -0,0 +1,84 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools + +import java.io.File + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.EngineManifest +import org.apache.predictionio.data.storage.EngineManifestSerializer +import org.apache.predictionio.data.storage.Storage +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path +import org.json4s._ +import org.json4s.native.Serialization.read + +import scala.io.Source + +object RegisterEngine extends Logging { + val engineManifests = Storage.getMetaDataEngineManifests + implicit val formats = DefaultFormats + new EngineManifestSerializer + + def registerEngine( + jsonManifest: File, + engineFiles: Seq[File], + copyLocal: Boolean = false): Unit = { + val jsonString = try { + Source.fromFile(jsonManifest).mkString + } catch { + case e: java.io.FileNotFoundException => + error(s"Engine manifest file not found: ${e.getMessage}. Aborting.") + sys.exit(1) + } + val engineManifest = read[EngineManifest](jsonString) + + info(s"Registering engine ${engineManifest.id} ${engineManifest.version}") + engineManifests.update( + engineManifest.copy(files = engineFiles.map(_.toURI.toString)), true) + } + + def unregisterEngine(jsonManifest: File): Unit = { + val jsonString = try { + Source.fromFile(jsonManifest).mkString + } catch { + case e: java.io.FileNotFoundException => + error(s"Engine manifest file not found: ${e.getMessage}. Aborting.") + sys.exit(1) + } + val fileEngineManifest = read[EngineManifest](jsonString) + val engineManifest = engineManifests.get( + fileEngineManifest.id, + fileEngineManifest.version) + + engineManifest map { em => + val conf = new Configuration + val fs = FileSystem.get(conf) + + em.files foreach { f => + val path = new Path(f) + info(s"Removing ${f}") + fs.delete(path, false) + } + + engineManifests.delete(em.id, em.version) + info(s"Unregistered engine ${em.id} ${em.version}") + } getOrElse { + error(s"${fileEngineManifest.id} ${fileEngineManifest.version} is not " + + "registered.") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala b/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala new file mode 100644 index 0000000..5dae46b --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/RunServer.scala @@ -0,0 +1,178 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools + +import java.io.File +import java.net.URI + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.EngineManifest +import org.apache.predictionio.tools.console.ConsoleArgs +import org.apache.predictionio.workflow.WorkflowUtils + +import scala.sys.process._ + +object RunServer extends Logging { + def runServer( + ca: ConsoleArgs, + core: File, + em: EngineManifest, + engineInstanceId: String): Int = { + val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_")).map(kv => + s"${kv._1}=${kv._2}" + ).mkString(",") + + val sparkHome = ca.common.sparkHome.getOrElse( + sys.env.getOrElse("SPARK_HOME", ".")) + + val extraFiles = WorkflowUtils.thirdPartyConfFiles + + val driverClassPathIndex = + ca.common.sparkPassThrough.indexOf("--driver-class-path") + val driverClassPathPrefix = + if (driverClassPathIndex != -1) { + Seq(ca.common.sparkPassThrough(driverClassPathIndex + 1)) + } else { + Seq() + } + val extraClasspaths = + driverClassPathPrefix ++ WorkflowUtils.thirdPartyClasspaths + + val deployModeIndex = + ca.common.sparkPassThrough.indexOf("--deploy-mode") + val deployMode = if (deployModeIndex != -1) { + ca.common.sparkPassThrough(deployModeIndex + 1) + } else { + "client" + } + + val mainJar = + if (ca.build.uberJar) { + if (deployMode == "cluster") { + em.files.filter(_.startsWith("hdfs")).head + } else { + em.files.filterNot(_.startsWith("hdfs")).head + } + } else { + if (deployMode == "cluster") { + em.files.filter(_.contains("pio-assembly")).head + } else { + core.getCanonicalPath + } + } + + val jarFiles = (em.files ++ Option(new File(ca.common.pioHome.get, "plugins") + .listFiles()).getOrElse(Array.empty[File]).map(_.getAbsolutePath)).mkString(",") + + val sparkSubmit = + Seq(Seq(sparkHome, "bin", "spark-submit").mkString(File.separator)) ++ + ca.common.sparkPassThrough ++ + Seq( + "--class", + "org.apache.predictionio.workflow.CreateServer", + "--name", + s"PredictionIO Engine Instance: ${engineInstanceId}") ++ + (if (!ca.build.uberJar) { + Seq("--jars", jarFiles) + } else Seq()) ++ + (if (extraFiles.size > 0) { + Seq("--files", extraFiles.mkString(",")) + } else { + Seq() + }) ++ + (if (extraClasspaths.size > 0) { + Seq("--driver-class-path", extraClasspaths.mkString(":")) + } else { + Seq() + }) ++ + (if (ca.common.sparkKryo) { + Seq( + "--conf", + "spark.serializer=org.apache.spark.serializer.KryoSerializer") + } else { + Seq() + }) ++ + Seq( + mainJar, + "--engineInstanceId", + engineInstanceId, + "--ip", + ca.deploy.ip, + "--port", + ca.deploy.port.toString, + "--event-server-ip", + ca.eventServer.ip, + "--event-server-port", + ca.eventServer.port.toString) ++ + (if (ca.accessKey.accessKey != "") { + Seq("--accesskey", ca.accessKey.accessKey) + } else { + Seq() + }) ++ + (if (ca.eventServer.enabled) Seq("--feedback") else Seq()) ++ + (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) ++ + (if (ca.common.verbose) Seq("--verbose") else Seq()) ++ + ca.deploy.logUrl.map(x => Seq("--log-url", x)).getOrElse(Seq()) ++ + ca.deploy.logPrefix.map(x => Seq("--log-prefix", x)).getOrElse(Seq()) ++ + Seq("--json-extractor", ca.common.jsonExtractor.toString) + + info(s"Submission command: ${sparkSubmit.mkString(" ")}") + + val proc = + Process(sparkSubmit, None, "CLASSPATH" -> "", "SPARK_YARN_USER_ENV" -> pioEnvVars).run() + Runtime.getRuntime.addShutdownHook(new Thread(new Runnable { + def run(): Unit = { + proc.destroy() + } + })) + proc.exitValue() + } + + def newRunServer( + ca: ConsoleArgs, + em: EngineManifest, + engineInstanceId: String): Int = { + val jarFiles = em.files.map(new URI(_)) ++ + Option(new File(ca.common.pioHome.get, "plugins").listFiles()) + .getOrElse(Array.empty[File]).map(_.toURI) + val args = Seq( + "--engineInstanceId", + engineInstanceId, + "--engine-variant", + ca.common.variantJson.toURI.toString, + "--ip", + ca.deploy.ip, + "--port", + ca.deploy.port.toString, + "--event-server-ip", + ca.eventServer.ip, + "--event-server-port", + ca.eventServer.port.toString) ++ + (if (ca.accessKey.accessKey != "") { + Seq("--accesskey", ca.accessKey.accessKey) + } else { + Nil + }) ++ + (if (ca.eventServer.enabled) Seq("--feedback") else Nil) ++ + (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Nil) ++ + (if (ca.common.verbose) Seq("--verbose") else Nil) ++ + ca.deploy.logUrl.map(x => Seq("--log-url", x)).getOrElse(Nil) ++ + ca.deploy.logPrefix.map(x => Seq("--log-prefix", x)).getOrElse(Nil) ++ + Seq("--json-extractor", ca.common.jsonExtractor.toString) + + Runner.runOnSpark("org.apache.predictionio.workflow.CreateServer", args, ca, jarFiles) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala new file mode 100644 index 0000000..4b42f40 --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala @@ -0,0 +1,212 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools + +import java.io.File +import java.net.URI + +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.EngineManifest +import org.apache.predictionio.tools.console.ConsoleArgs +import org.apache.predictionio.workflow.WorkflowUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + +import scala.sys.process._ + +object RunWorkflow extends Logging { + def runWorkflow( + ca: ConsoleArgs, + core: File, + em: EngineManifest, + variantJson: File): Int = { + // Collect and serialize PIO_* environmental variables + val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_")).map(kv => + s"${kv._1}=${kv._2}" + ).mkString(",") + + val sparkHome = ca.common.sparkHome.getOrElse( + sys.env.getOrElse("SPARK_HOME", ".")) + + val hadoopConf = new Configuration + val hdfs = FileSystem.get(hadoopConf) + + val driverClassPathIndex = + ca.common.sparkPassThrough.indexOf("--driver-class-path") + val driverClassPathPrefix = + if (driverClassPathIndex != -1) { + Seq(ca.common.sparkPassThrough(driverClassPathIndex + 1)) + } else { + Seq() + } + val extraClasspaths = + driverClassPathPrefix ++ WorkflowUtils.thirdPartyClasspaths + + val deployModeIndex = + ca.common.sparkPassThrough.indexOf("--deploy-mode") + val deployMode = if (deployModeIndex != -1) { + ca.common.sparkPassThrough(deployModeIndex + 1) + } else { + "client" + } + + val extraFiles = WorkflowUtils.thirdPartyConfFiles + + val mainJar = + if (ca.build.uberJar) { + if (deployMode == "cluster") { + em.files.filter(_.startsWith("hdfs")).head + } else { + em.files.filterNot(_.startsWith("hdfs")).head + } + } else { + if (deployMode == "cluster") { + em.files.filter(_.contains("pio-assembly")).head + } else { + core.getCanonicalPath + } + } + + val workMode = + ca.common.evaluation.map(_ => "Evaluation").getOrElse("Training") + + val engineLocation = Seq( + sys.env("PIO_FS_ENGINESDIR"), + em.id, + em.version) + + if (deployMode == "cluster") { + val dstPath = new Path(engineLocation.mkString(Path.SEPARATOR)) + info("Cluster deploy mode detected. Trying to copy " + + s"${variantJson.getCanonicalPath} to " + + s"${hdfs.makeQualified(dstPath).toString}.") + hdfs.copyFromLocalFile(new Path(variantJson.toURI), dstPath) + } + + val sparkSubmit = + Seq(Seq(sparkHome, "bin", "spark-submit").mkString(File.separator)) ++ + ca.common.sparkPassThrough ++ + Seq( + "--class", + "org.apache.predictionio.workflow.CreateWorkflow", + "--name", + s"PredictionIO $workMode: ${em.id} ${em.version} (${ca.common.batch})") ++ + (if (!ca.build.uberJar) { + Seq("--jars", em.files.mkString(",")) + } else Seq()) ++ + (if (extraFiles.size > 0) { + Seq("--files", extraFiles.mkString(",")) + } else { + Seq() + }) ++ + (if (extraClasspaths.size > 0) { + Seq("--driver-class-path", extraClasspaths.mkString(":")) + } else { + Seq() + }) ++ + (if (ca.common.sparkKryo) { + Seq( + "--conf", + "spark.serializer=org.apache.spark.serializer.KryoSerializer") + } else { + Seq() + }) ++ + Seq( + mainJar, + "--env", + pioEnvVars, + "--engine-id", + em.id, + "--engine-version", + em.version, + "--engine-variant", + if (deployMode == "cluster") { + hdfs.makeQualified(new Path( + (engineLocation :+ variantJson.getName).mkString(Path.SEPARATOR))). + toString + } else { + variantJson.getCanonicalPath + }, + "--verbosity", + ca.common.verbosity.toString) ++ + ca.common.engineFactory.map( + x => Seq("--engine-factory", x)).getOrElse(Seq()) ++ + ca.common.engineParamsKey.map( + x => Seq("--engine-params-key", x)).getOrElse(Seq()) ++ + (if (deployMode == "cluster") Seq("--deploy-mode", "cluster") else Seq()) ++ + (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) ++ + (if (ca.common.verbose) Seq("--verbose") else Seq()) ++ + (if (ca.common.skipSanityCheck) Seq("--skip-sanity-check") else Seq()) ++ + (if (ca.common.stopAfterRead) Seq("--stop-after-read") else Seq()) ++ + (if (ca.common.stopAfterPrepare) { + Seq("--stop-after-prepare") + } else { + Seq() + }) ++ + ca.common.evaluation.map(x => Seq("--evaluation-class", x)). + getOrElse(Seq()) ++ + // If engineParamsGenerator is specified, it overrides the evaluation. + ca.common.engineParamsGenerator.orElse(ca.common.evaluation) + .map(x => Seq("--engine-params-generator-class", x)) + .getOrElse(Seq()) ++ + (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) ++ + Seq("--json-extractor", ca.common.jsonExtractor.toString) + + info(s"Submission command: ${sparkSubmit.mkString(" ")}") + Process(sparkSubmit, None, "CLASSPATH" -> "", "SPARK_YARN_USER_ENV" -> pioEnvVars).! + } + + def newRunWorkflow(ca: ConsoleArgs, em: EngineManifest): Int = { + val jarFiles = em.files.map(new URI(_)) + val args = Seq( + "--engine-id", + em.id, + "--engine-version", + em.version, + "--engine-variant", + ca.common.variantJson.toURI.toString, + "--verbosity", + ca.common.verbosity.toString) ++ + ca.common.engineFactory.map( + x => Seq("--engine-factory", x)).getOrElse(Seq()) ++ + ca.common.engineParamsKey.map( + x => Seq("--engine-params-key", x)).getOrElse(Seq()) ++ + (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) ++ + (if (ca.common.verbose) Seq("--verbose") else Seq()) ++ + (if (ca.common.skipSanityCheck) Seq("--skip-sanity-check") else Seq()) ++ + (if (ca.common.stopAfterRead) Seq("--stop-after-read") else Seq()) ++ + (if (ca.common.stopAfterPrepare) { + Seq("--stop-after-prepare") + } else { + Seq() + }) ++ + ca.common.evaluation.map(x => Seq("--evaluation-class", x)). + getOrElse(Seq()) ++ + // If engineParamsGenerator is specified, it overrides the evaluation. + ca.common.engineParamsGenerator.orElse(ca.common.evaluation) + .map(x => Seq("--engine-params-generator-class", x)) + .getOrElse(Seq()) ++ + (if (ca.common.batch != "") Seq("--batch", ca.common.batch) else Seq()) ++ + Seq("--json-extractor", ca.common.jsonExtractor.toString) + + Runner.runOnSpark( + "org.apache.predictionio.workflow.CreateWorkflow", + args, + ca, + jarFiles) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala b/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala new file mode 100644 index 0000000..3a8fed5 --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala @@ -0,0 +1,211 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools + +import java.io.File +import java.net.URI + +import grizzled.slf4j.Logging +import org.apache.predictionio.tools.console.ConsoleArgs +import org.apache.predictionio.workflow.WorkflowUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path + +import scala.sys.process._ + +object Runner extends Logging { + def envStringToMap(env: String): Map[String, String] = + env.split(',').flatMap(p => + p.split('=') match { + case Array(k, v) => List(k -> v) + case _ => Nil + } + ).toMap + + def argumentValue(arguments: Seq[String], argumentName: String): Option[String] = { + val argumentIndex = arguments.indexOf(argumentName) + try { + arguments(argumentIndex) // just to make it error out if index is -1 + Some(arguments(argumentIndex + 1)) + } catch { + case e: IndexOutOfBoundsException => None + } + } + + def handleScratchFile( + fileSystem: Option[FileSystem], + uri: Option[URI], + localFile: File): String = { + val localFilePath = localFile.getCanonicalPath + (fileSystem, uri) match { + case (Some(fs), Some(u)) => + val dest = fs.makeQualified(Path.mergePaths( + new Path(u), + new Path(localFilePath))) + info(s"Copying $localFile to ${dest.toString}") + fs.copyFromLocalFile(new Path(localFilePath), dest) + dest.toUri.toString + case _ => localFile.toURI.toString + } + } + + def cleanup(fs: Option[FileSystem], uri: Option[URI]): Unit = { + (fs, uri) match { + case (Some(f), Some(u)) => + f.close() + case _ => Unit + } + } + + def detectFilePaths( + fileSystem: Option[FileSystem], + uri: Option[URI], + args: Seq[String]): Seq[String] = { + args map { arg => + val f = try { + new File(new URI(arg)) + } catch { + case e: Throwable => new File(arg) + } + if (f.exists()) { + handleScratchFile(fileSystem, uri, f) + } else { + arg + } + } + } + + def runOnSpark( + className: String, + classArgs: Seq[String], + ca: ConsoleArgs, + extraJars: Seq[URI]): Int = { + // Return error for unsupported cases + val deployMode = + argumentValue(ca.common.sparkPassThrough, "--deploy-mode").getOrElse("client") + val master = + argumentValue(ca.common.sparkPassThrough, "--master").getOrElse("local") + + (ca.common.scratchUri, deployMode, master) match { + case (Some(u), "client", m) if m != "yarn-cluster" => + error("--scratch-uri cannot be set when deploy mode is client") + return 1 + case (_, "cluster", m) if m.startsWith("spark://") => + error("Using cluster deploy mode with Spark standalone cluster is not supported") + return 1 + case _ => Unit + } + + // Initialize HDFS API for scratch URI + val fs = ca.common.scratchUri map { uri => + FileSystem.get(uri, new Configuration()) + } + + // Collect and serialize PIO_* environmental variables + val pioEnvVars = sys.env.filter(kv => kv._1.startsWith("PIO_")).map(kv => + s"${kv._1}=${kv._2}" + ).mkString(",") + + // Location of Spark + val sparkHome = ca.common.sparkHome.getOrElse( + sys.env.getOrElse("SPARK_HOME", ".")) + + // Local path to PredictionIO assembly JAR + val mainJar = handleScratchFile( + fs, + ca.common.scratchUri, + console.Console.coreAssembly(ca.common.pioHome.get)) + + // Extra JARs that are needed by the driver + val driverClassPathPrefix = + argumentValue(ca.common.sparkPassThrough, "--driver-class-path") map { v => + Seq(v) + } getOrElse { + Nil + } + + val extraClasspaths = + driverClassPathPrefix ++ WorkflowUtils.thirdPartyClasspaths + + // Extra files that are needed to be passed to --files + val extraFiles = WorkflowUtils.thirdPartyConfFiles map { f => + handleScratchFile(fs, ca.common.scratchUri, new File(f)) + } + + val deployedJars = extraJars map { j => + handleScratchFile(fs, ca.common.scratchUri, new File(j)) + } + + val sparkSubmitCommand = + Seq(Seq(sparkHome, "bin", "spark-submit").mkString(File.separator)) + + val sparkSubmitJars = if (extraJars.nonEmpty) { + Seq("--jars", deployedJars.map(_.toString).mkString(",")) + } else { + Nil + } + + val sparkSubmitFiles = if (extraFiles.nonEmpty) { + Seq("--files", extraFiles.mkString(",")) + } else { + Nil + } + + val sparkSubmitExtraClasspaths = if (extraClasspaths.nonEmpty) { + Seq("--driver-class-path", extraClasspaths.mkString(":")) + } else { + Nil + } + + val sparkSubmitKryo = if (ca.common.sparkKryo) { + Seq( + "--conf", + "spark.serializer=org.apache.spark.serializer.KryoSerializer") + } else { + Nil + } + + val verbose = if (ca.common.verbose) Seq("--verbose") else Nil + + val sparkSubmit = Seq( + sparkSubmitCommand, + ca.common.sparkPassThrough, + Seq("--class", className), + sparkSubmitJars, + sparkSubmitFiles, + sparkSubmitExtraClasspaths, + sparkSubmitKryo, + Seq(mainJar), + detectFilePaths(fs, ca.common.scratchUri, classArgs), + Seq("--env", pioEnvVars), + verbose).flatten.filter(_ != "") + info(s"Submission command: ${sparkSubmit.mkString(" ")}") + val proc = Process( + sparkSubmit, + None, + "CLASSPATH" -> "", + "SPARK_YARN_USER_ENV" -> pioEnvVars).run() + Runtime.getRuntime.addShutdownHook(new Thread(new Runnable { + def run(): Unit = { + cleanup(fs, ca.common.scratchUri) + proc.destroy() + } + })) + cleanup(fs, ca.common.scratchUri) + proc.exitValue() + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala b/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala new file mode 100644 index 0000000..b70cb7e --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/admin/AdminAPI.scala @@ -0,0 +1,156 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools.admin + +import akka.actor.{Actor, ActorSystem, Props} +import akka.event.Logging +import akka.io.IO +import akka.util.Timeout +import org.apache.predictionio.data.api.StartServer +import org.apache.predictionio.data.storage.Storage +import org.json4s.{Formats, DefaultFormats} + +import java.util.concurrent.TimeUnit + +import spray.can.Http +import spray.http.{MediaTypes, StatusCodes} +import spray.httpx.Json4sSupport +import spray.routing._ + +import scala.concurrent.ExecutionContext + +class AdminServiceActor(val commandClient: CommandClient) + extends HttpServiceActor { + + object Json4sProtocol extends Json4sSupport { + implicit def json4sFormats: Formats = DefaultFormats + } + + import Json4sProtocol._ + + val log = Logging(context.system, this) + + // we use the enclosing ActorContext's or ActorSystem's dispatcher for our + // Futures + implicit def executionContext: ExecutionContext = actorRefFactory.dispatcher + implicit val timeout: Timeout = Timeout(5, TimeUnit.SECONDS) + + // for better message response + val rejectionHandler = RejectionHandler { + case MalformedRequestContentRejection(msg, _) :: _ => + complete(StatusCodes.BadRequest, Map("message" -> msg)) + case MissingQueryParamRejection(msg) :: _ => + complete(StatusCodes.NotFound, + Map("message" -> s"missing required query parameter ${msg}.")) + case AuthenticationFailedRejection(cause, challengeHeaders) :: _ => + complete(StatusCodes.Unauthorized, challengeHeaders, + Map("message" -> s"Invalid accessKey.")) + } + + val jsonPath = """(.+)\.json$""".r + + val route: Route = + pathSingleSlash { + get { + respondWithMediaType(MediaTypes.`application/json`) { + complete(Map("status" -> "alive")) + } + } + } ~ + path("cmd" / "app" / Segment / "data") { + appName => { + delete { + respondWithMediaType(MediaTypes.`application/json`) { + complete(commandClient.futureAppDataDelete(appName)) + } + } + } + } ~ + path("cmd" / "app" / Segment) { + appName => { + delete { + respondWithMediaType(MediaTypes.`application/json`) { + complete(commandClient.futureAppDelete(appName)) + } + } + } + } ~ + path("cmd" / "app") { + get { + respondWithMediaType(MediaTypes.`application/json`) { + complete(commandClient.futureAppList()) + } + } ~ + post { + entity(as[AppRequest]) { + appArgs => respondWithMediaType(MediaTypes.`application/json`) { + complete(commandClient.futureAppNew(appArgs)) + } + } + } + } + def receive: Actor.Receive = runRoute(route) +} + +class AdminServerActor(val commandClient: CommandClient) extends Actor { + val log = Logging(context.system, this) + val child = context.actorOf( + Props(classOf[AdminServiceActor], commandClient), + "AdminServiceActor") + + implicit val system = context.system + + def receive: PartialFunction[Any, Unit] = { + case StartServer(host, portNum) => { + IO(Http) ! Http.Bind(child, interface = host, port = portNum) + + } + case m: Http.Bound => log.info("Bound received. AdminServer is ready.") + case m: Http.CommandFailed => log.error("Command failed.") + case _ => log.error("Unknown message.") + } +} + +case class AdminServerConfig( + ip: String = "localhost", + port: Int = 7071 +) + +object AdminServer { + def createAdminServer(config: AdminServerConfig): Unit = { + implicit val system = ActorSystem("AdminServerSystem") + + val commandClient = new CommandClient( + appClient = Storage.getMetaDataApps, + accessKeyClient = Storage.getMetaDataAccessKeys, + eventClient = Storage.getLEvents() + ) + + val serverActor = system.actorOf( + Props(classOf[AdminServerActor], commandClient), + "AdminServerActor") + serverActor ! StartServer(config.ip, config.port) + system.awaitTermination + } +} + +object AdminRun { + def main (args: Array[String]) { + AdminServer.createAdminServer(AdminServerConfig( + ip = "localhost", + port = 7071)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala b/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala new file mode 100644 index 0000000..143023e --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala @@ -0,0 +1,160 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools.admin + +import org.apache.predictionio.data.storage._ + +import scala.concurrent.{ExecutionContext, Future} + +abstract class BaseResponse() + +case class GeneralResponse( + status: Int = 0, + message: String = "" +) extends BaseResponse() + +case class AppRequest( + id: Int = 0, + name: String = "", + description: String = "" +) + +case class TrainRequest( + enginePath: String = "" +) +case class AppResponse( + id: Int = 0, + name: String = "", + keys: Seq[AccessKey] +) extends BaseResponse() + +case class AppNewResponse( + status: Int = 0, + message: String = "", + id: Int = 0, + name: String = "", + key: String +) extends BaseResponse() + +case class AppListResponse( + status: Int = 0, + message: String = "", + apps: Seq[AppResponse] +) extends BaseResponse() + +class CommandClient( + val appClient: Apps, + val accessKeyClient: AccessKeys, + val eventClient: LEvents +) { + + def futureAppNew(req: AppRequest)(implicit ec: ExecutionContext): Future[BaseResponse] = Future { + val response = appClient.getByName(req.name) map { app => + GeneralResponse(0, s"App ${req.name} already exists. Aborting.") + } getOrElse { + appClient.get(req.id) map { + app2 => + GeneralResponse(0, + s"App ID ${app2.id} already exists and maps to the app '${app2.name}'. " + + "Aborting.") + } getOrElse { + val appid = appClient.insert(App( + id = Option(req.id).getOrElse(0), + name = req.name, + description = Option(req.description))) + appid map { id => + val dbInit = eventClient.init(id) + val r = if (dbInit) { + val accessKey = AccessKey( + key = "", + appid = id, + events = Seq()) + val accessKey2 = accessKeyClient.insert(AccessKey( + key = "", + appid = id, + events = Seq())) + accessKey2 map { k => + new AppNewResponse(1,"App created successfully.",id, req.name, k) + } getOrElse { + GeneralResponse(0, s"Unable to create new access key.") + } + } else { + GeneralResponse(0, s"Unable to initialize Event Store for this app ID: ${id}.") + } + r + } getOrElse { + GeneralResponse(0, s"Unable to create new app.") + } + } + } + response + } + + def futureAppList()(implicit ec: ExecutionContext): Future[AppListResponse] = Future { + val apps = appClient.getAll().sortBy(_.name) + val appsRes = apps.map { + app => { + new AppResponse(app.id, app.name, accessKeyClient.getByAppid(app.id)) + } + } + new AppListResponse(1, "Successful retrieved app list.", appsRes) + } + + def futureAppDataDelete(appName: String) + (implicit ec: ExecutionContext): Future[GeneralResponse] = Future { + val response = appClient.getByName(appName) map { app => + val data = if (eventClient.remove(app.id)) { + GeneralResponse(1, s"Removed Event Store for this app ID: ${app.id}") + } else { + GeneralResponse(0, s"Error removing Event Store for this app.") + } + + val dbInit = eventClient.init(app.id) + val data2 = if (dbInit) { + GeneralResponse(1, s"Initialized Event Store for this app ID: ${app.id}.") + } else { + GeneralResponse(0, s"Unable to initialize Event Store for this appId:" + + s" ${app.id}.") + } + GeneralResponse(data.status * data2.status, data.message + data2.message) + } getOrElse { + GeneralResponse(0, s"App ${appName} does not exist.") + } + response + } + + def futureAppDelete(appName: String) + (implicit ec: ExecutionContext): Future[GeneralResponse] = Future { + + val response = appClient.getByName(appName) map { app => + val data = if (eventClient.remove(app.id)) { + Storage.getMetaDataApps.delete(app.id) + GeneralResponse(1, s"App successfully deleted") + } else { + GeneralResponse(0, s"Error removing Event Store for app ${app.name}."); + } + data + } getOrElse { + GeneralResponse(0, s"App ${appName} does not exist.") + } + response + } + + def futureTrain(req: TrainRequest) + (implicit ec: ExecutionContext): Future[GeneralResponse] = Future { + null + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md b/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md new file mode 100644 index 0000000..475a3de --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/admin/README.md @@ -0,0 +1,161 @@ +## Admin API (under development) + +### Start Admin HTTP Server without bin/pio (for development) + +NOTE: elasticsearch and hbase should be running first. + +``` +$ sbt/sbt "tools/compile" +$ set -a +$ source conf/pio-env.sh +$ set +a +$ sbt/sbt "tools/run-main io.prediction.tools.admin.AdminRun" +``` + +### Unit test (Very minimal) + +``` +$ set -a +$ source conf/pio-env.sh +$ set +a +$ sbt/sbt "tools/test-only io.prediction.tools.admin.AdminAPISpec" +``` + +### Start with pio command adminserver + +``` +$ pio adminserver +``` + +Admin Server url defaults to `http://localhost:7071` + +The host and port can be specified by using the 'ip' and 'port' parameters + +``` +$ pio adminserver --ip 127.0.0.1 --port 7080 +``` + +### Current Supported Commands + +#### Check status + +``` +$ curl -i http://localhost:7071/ + +{"status":"alive"} +``` + +#### Get list of apps + +``` +$ curl -i -X GET http://localhost:7071/cmd/app + +{"status":1,"message":"Successful retrieved app list.","apps":[{"id":12,"name":"scratch","keys":[{"key":"gtPgVMIr3uthus1QJWFBcIjNf6d1SNuhaOWQAgdLbOBP1eRWMNIJWl6SkHgI1OoN","appid":12,"events":[]}]},{"id":17,"name":"test-ecommercerec","keys":[{"key":"zPkr6sBwQoBwBjVHK2hsF9u26L38ARSe19QzkdYentuomCtYSuH0vXP5fq7advo4","appid":17,"events":[]}]}]} +``` + +#### Create a new app + +``` +$ curl -i -X POST http://localhost:7071/cmd/app \ +-H "Content-Type: application/json" \ +-d '{ "name" : "my_new_app" }' + +{"status":1,"message":"App created successfully.","id":19,"name":"my_new_app","keys":[{"key":"","appid":19,"events":[]}]} +``` + +#### Delete data of app + +``` +$ curl -i -X DELETE http://localhost:7071/cmd/app/my_new_app/data +``` + +#### Delete app + +``` +$ curl -i -X DELETE http://localhost:7071/cmd/app/my_new_app + +{"status":1,"message":"App successfully deleted"} +``` + + +## API Doc (To be updated) + +### app list: +GET http://localhost:7071/cmd/app + +OK Response: +{ + âstatusâ: <STATUS>, + âmessageâ: <MESSAGE>, + âappsâ : [ + { âname': â<APP_NAME>â, + âid': <APP_ID>, + âaccessKey' : â<ACCESS_KEY>â }, + { âname': â<APP_NAME>â, + âid': <APP_ID>, + âaccessKey' : â<ACCESS_KEY>â }, ... ] +} + +Error Response: +{âstatusâ: <STATUS>, âmessageâ : â<MESSAGE>â} + +### app new +POST http://localhost:7071/cmd/app +Request Body: +{ nameâ: â<APP_NAME>â, // required + âidâ: <APP_ID>, // optional + âdescriptionâ: â<DESCRIPTION>â } // optional + +OK Response: +{ âstatusâ: <STATUS>, + âmessageâ: <MESSAGE>, + âappâ : { + ânameâ: â<APP_NAME>â, + âidâ: <APP_ID>, + âaccessKeyâ : â<ACCESS_KEY>â } +} + +Error Response: +{ âstatusâ: <STATUS>, âmessageâ : â<MESSAGE>â} + +### app delete +DELETE http://localhost:7071/cmd/app/{appName} + +OK Response: +{ "status": <STATUS>, "message" : â<MESSAGE>â} + +Error Response: +{ âstatusâ: <STATUS>, âmessageâ : â<MESSAGE>â} + +### app data-delete +DELETE http://localhost:7071/cmd/app/{appName}/data + +OK Response: +{ "status": <STATUS>, "message" : â<MESSAGE>â} + +Error Response: +{ âstatusâ: <STATUS>, âmessageâ : â<MESSAGE>â } + + +### train TBD + +#### Training request: +POST http://localhost:7071/cmd/train +Request body: TBD + +OK Response: TBD + +Error Response: TBD + +#### Get training status: +GET http://localhost:7071/cmd/train/{engineInstanceId} + +OK Response: TBD +INIT +TRAINING +DONE +ERROR + +Error Response: TBD + +### deploy TBD http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/console/AccessKey.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/AccessKey.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/AccessKey.scala new file mode 100644 index 0000000..a6ab83c --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/console/AccessKey.scala @@ -0,0 +1,83 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools.console + +import org.apache.predictionio.data.storage + +import grizzled.slf4j.Logging + +case class AccessKeyArgs( + accessKey: String = "", + events: Seq[String] = Seq()) + +object AccessKey extends Logging { + def create(ca: ConsoleArgs): Int = { + val apps = storage.Storage.getMetaDataApps + apps.getByName(ca.app.name) map { app => + val accessKeys = storage.Storage.getMetaDataAccessKeys + val accessKey = accessKeys.insert(storage.AccessKey( + key = ca.accessKey.accessKey, + appid = app.id, + events = ca.accessKey.events)) + accessKey map { k => + info(s"Created new access key: ${k}") + 0 + } getOrElse { + error(s"Unable to create new access key.") + 1 + } + } getOrElse { + error(s"App ${ca.app.name} does not exist. Aborting.") + 1 + } + } + + def list(ca: ConsoleArgs): Int = { + val keys = + if (ca.app.name == "") { + storage.Storage.getMetaDataAccessKeys.getAll + } else { + val apps = storage.Storage.getMetaDataApps + apps.getByName(ca.app.name) map { app => + storage.Storage.getMetaDataAccessKeys.getByAppid(app.id) + } getOrElse { + error(s"App ${ca.app.name} does not exist. Aborting.") + return 1 + } + } + val title = "Access Key(s)" + info(f"$title%64s | App ID | Allowed Event(s)") + keys.sortBy(k => k.appid) foreach { k => + val events = + if (k.events.size > 0) k.events.sorted.mkString(",") else "(all)" + info(f"${k.key}%64s | ${k.appid}%6d | $events%s") + } + info(s"Finished listing ${keys.size} access key(s).") + 0 + } + + def delete(ca: ConsoleArgs): Int = { + try { + storage.Storage.getMetaDataAccessKeys.delete(ca.accessKey.accessKey) + info(s"Deleted access key ${ca.accessKey.accessKey}.") + 0 + } catch { + case e: Exception => + error(s"Error deleting access key ${ca.accessKey.accessKey}.", e) + 1 + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/tools/src/main/scala/org/apache/predictionio/tools/console/App.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/App.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/App.scala new file mode 100644 index 0000000..cc2f36d --- /dev/null +++ b/tools/src/main/scala/org/apache/predictionio/tools/console/App.scala @@ -0,0 +1,537 @@ +/** Copyright 2015 TappingStone, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.tools.console + +import org.apache.predictionio.data.storage + +import grizzled.slf4j.Logging + +case class AppArgs( + id: Option[Int] = None, + name: String = "", + channel: String = "", + dataDeleteChannel: Option[String] = None, + all: Boolean = false, + force: Boolean = false, + description: Option[String] = None) + +object App extends Logging { + def create(ca: ConsoleArgs): Int = { + val apps = storage.Storage.getMetaDataApps() + // get the client in the beginning so error exit right away if can't access client + val events = storage.Storage.getLEvents() + apps.getByName(ca.app.name) map { app => + error(s"App ${ca.app.name} already exists. Aborting.") + 1 + } getOrElse { + ca.app.id.map { id => + apps.get(id) map { app => + error( + s"App ID ${id} already exists and maps to the app '${app.name}'. " + + "Aborting.") + return 1 + } + } + val appid = apps.insert(storage.App( + id = ca.app.id.getOrElse(0), + name = ca.app.name, + description = ca.app.description)) + appid map { id => + val dbInit = events.init(id) + val r = if (dbInit) { + info(s"Initialized Event Store for this app ID: ${id}.") + val accessKeys = storage.Storage.getMetaDataAccessKeys + val accessKey = accessKeys.insert(storage.AccessKey( + key = ca.accessKey.accessKey, + appid = id, + events = Seq())) + accessKey map { k => + info("Created new app:") + info(s" Name: ${ca.app.name}") + info(s" ID: ${id}") + info(s"Access Key: ${k}") + 0 + } getOrElse { + error(s"Unable to create new access key.") + 1 + } + } else { + error(s"Unable to initialize Event Store for this app ID: ${id}.") + // revert back the meta data change + try { + apps.delete(id) + 0 + } catch { + case e: Exception => + error(s"Failed to revert back the App meta-data change.", e) + error(s"The app ${ca.app.name} CANNOT be used!") + error(s"Please run 'pio app delete ${ca.app.name}' " + + "to delete this app!") + 1 + } + } + events.close() + r + } getOrElse { + error(s"Unable to create new app.") + 1 + } + } + } + + def list(ca: ConsoleArgs): Int = { + val apps = storage.Storage.getMetaDataApps.getAll().sortBy(_.name) + val accessKeys = storage.Storage.getMetaDataAccessKeys + val title = "Name" + val ak = "Access Key" + info(f"$title%20s | ID | $ak%64s | Allowed Event(s)") + apps foreach { app => + val keys = accessKeys.getByAppid(app.id) + keys foreach { k => + val events = + if (k.events.size > 0) k.events.sorted.mkString(",") else "(all)" + info(f"${app.name}%20s | ${app.id}%4d | ${k.key}%64s | $events%s") + } + } + info(s"Finished listing ${apps.size} app(s).") + 0 + } + + def show(ca: ConsoleArgs): Int = { + val apps = storage.Storage.getMetaDataApps + val accessKeys = storage.Storage.getMetaDataAccessKeys + val channels = storage.Storage.getMetaDataChannels + apps.getByName(ca.app.name) map { app => + info(s" App Name: ${app.name}") + info(s" App ID: ${app.id}") + info(s" Description: ${app.description.getOrElse("")}") + val keys = accessKeys.getByAppid(app.id) + + var firstKey = true + keys foreach { k => + val events = + if (k.events.size > 0) k.events.sorted.mkString(",") else "(all)" + if (firstKey) { + info(f" Access Key: ${k.key}%s | ${events}%s") + firstKey = false + } else { + info(f" ${k.key}%s | ${events}%s") + } + } + + val chans = channels.getByAppid(app.id) + var firstChan = true + val titleName = "Channel Name" + val titleID = "Channel ID" + chans.foreach { ch => + if (firstChan) { + info(f" Channels: ${titleName}%16s | ${titleID}%10s ") + firstChan = false + } + info(f" ${ch.name}%16s | ${ch.id}%10s") + } + 0 + } getOrElse { + error(s"App ${ca.app.name} does not exist. Aborting.") + 1 + } + } + + def delete(ca: ConsoleArgs): Int = { + val apps = storage.Storage.getMetaDataApps + val accesskeys = storage.Storage.getMetaDataAccessKeys + val channels = storage.Storage.getMetaDataChannels + val events = storage.Storage.getLEvents() + val status = apps.getByName(ca.app.name) map { app => + info(s"The following app (including all channels) will be deleted. Are you sure?") + info(s" App Name: ${app.name}") + info(s" App ID: ${app.id}") + info(s" Description: ${app.description.getOrElse("")}") + val chans = channels.getByAppid(app.id) + var firstChan = true + val titleName = "Channel Name" + val titleID = "Channel ID" + chans.foreach { ch => + if (firstChan) { + info(f" Channels: ${titleName}%16s | ${titleID}%10s ") + firstChan = false + } + info(f" ${ch.name}%16s | ${ch.id}%10s") + } + + val choice = if(ca.app.force) "YES" else readLine("Enter 'YES' to proceed: ") + choice match { + case "YES" => { + // delete channels + val delChannelStatus: Seq[Int] = chans.map { ch => + if (events.remove(app.id, Some(ch.id))) { + info(s"Removed Event Store of the channel ID: ${ch.id}") + try { + channels.delete(ch.id) + info(s"Deleted channel ${ch.name}") + 0 + } catch { + case e: Exception => + error(s"Error deleting channel ${ch.name}.", e) + 1 + } + } else { + error(s"Error removing Event Store of the channel ID: ${ch.id}.") + return 1 + } + } + + if (delChannelStatus.exists(_ != 0)) { + error("Error occurred while deleting channels. Aborting.") + return 1 + } + + try { + events.remove(app.id) + info(s"Removed Event Store for this app ID: ${app.id}") + } catch { + case e: Exception => + error(s"Error removing Event Store for this app. Aborting.", e) + return 1 + } + + accesskeys.getByAppid(app.id) foreach { key => + try { + accesskeys.delete(key.key) + info(s"Removed access key ${key.key}") + } catch { + case e: Exception => + error(s"Error removing access key ${key.key}. Aborting.", e) + return 1 + } + } + + try { + apps.delete(app.id) + info(s"Deleted app ${app.name}.") + } catch { + case e: Exception => + error(s"Error deleting app ${app.name}. Aborting.", e) + return 1 + } + + info("Done.") + 0 + } + case _ => + info("Aborted.") + 0 + } + } getOrElse { + error(s"App ${ca.app.name} does not exist. Aborting.") + 1 + } + events.close() + status + } + + def dataDelete(ca: ConsoleArgs): Int = { + if (ca.app.all) { + dataDeleteAll(ca) + } else { + dataDeleteOne(ca) + } + } + + def dataDeleteOne(ca: ConsoleArgs): Int = { + val apps = storage.Storage.getMetaDataApps + val channels = storage.Storage.getMetaDataChannels + apps.getByName(ca.app.name) map { app => + + val channelId = ca.app.dataDeleteChannel.map { ch => + val channelMap = channels.getByAppid(app.id).map(c => (c.name, c.id)).toMap + if (!channelMap.contains(ch)) { + error(s"Unable to delete data for channel.") + error(s"Channel ${ch} doesn't exist.") + return 1 + } + + channelMap(ch) + } + + if (channelId.isDefined) { + info(s"Data of the following channel will be deleted. Are you sure?") + info(s"Channel Name: ${ca.app.dataDeleteChannel.get}") + info(s" Channel ID: ${channelId.get}") + info(s" App Name: ${app.name}") + info(s" App ID: ${app.id}") + info(s" Description: ${app.description}") + } else { + info(s"Data of the following app (default channel only) will be deleted. Are you sure?") + info(s" App Name: ${app.name}") + info(s" App ID: ${app.id}") + info(s" Description: ${app.description}") + } + + val choice = if(ca.app.force) "YES" else readLine("Enter 'YES' to proceed: ") + + choice match { + case "YES" => { + val events = storage.Storage.getLEvents() + // remove table + val r1 = if (events.remove(app.id, channelId)) { + if (channelId.isDefined) { + info(s"Removed Event Store for this channel ID: ${channelId.get}") + } else { + info(s"Removed Event Store for this app ID: ${app.id}") + } + 0 + } else { + if (channelId.isDefined) { + error(s"Error removing Event Store for this channel.") + } else { + error(s"Error removing Event Store for this app.") + } + 1 + } + // re-create table + val dbInit = events.init(app.id, channelId) + val r2 = if (dbInit) { + if (channelId.isDefined) { + info(s"Initialized Event Store for this channel ID: ${channelId.get}.") + } else { + info(s"Initialized Event Store for this app ID: ${app.id}.") + } + 0 + } else { + if (channelId.isDefined) { + error(s"Unable to initialize Event Store for this channel ID:" + + s" ${channelId.get}.") + } else { + error(s"Unable to initialize Event Store for this appId:" + + s" ${app.id}.") + } + 1 + } + events.close() + info("Done.") + r1 + r2 + } + case _ => + info("Aborted.") + 0 + } + } getOrElse { + error(s"App ${ca.app.name} does not exist. Aborting.") + 1 + } + } + + def dataDeleteAll(ca: ConsoleArgs): Int = { + val apps = storage.Storage.getMetaDataApps + val channels = storage.Storage.getMetaDataChannels + val events = storage.Storage.getLEvents() + val status = apps.getByName(ca.app.name) map { app => + info(s"All data of the app (including default and all channels) will be deleted." + + " Are you sure?") + info(s" App Name: ${app.name}") + info(s" App ID: ${app.id}") + info(s" Description: ${app.description}") + val chans = channels.getByAppid(app.id) + var firstChan = true + val titleName = "Channel Name" + val titleID = "Channel ID" + chans.foreach { ch => + if (firstChan) { + info(f" Channels: ${titleName}%16s | ${titleID}%10s ") + firstChan = false + } + info(f" ${ch.name}%16s | ${ch.id}%10s") + } + + val choice = if(ca.app.force) "YES" else readLine("Enter 'YES' to proceed: ") + choice match { + case "YES" => { + // delete channels + val delChannelStatus: Seq[Int] = chans.map { ch => + val r1 = if (events.remove(app.id, Some(ch.id))) { + info(s"Removed Event Store of the channel ID: ${ch.id}") + 0 + } else { + error(s"Error removing Event Store of the channel ID: ${ch.id}.") + 1 + } + // re-create table + val dbInit = events.init(app.id, Some(ch.id)) + val r2 = if (dbInit) { + info(s"Initialized Event Store of the channel ID: ${ch.id}") + 0 + } else { + error(s"Unable to initialize Event Store of the channel ID: ${ch.id}.") + 1 + } + r1 + r2 + } + + if (delChannelStatus.filter(_ != 0).isEmpty) { + val r1 = if (events.remove(app.id)) { + info(s"Removed Event Store for this app ID: ${app.id}") + 0 + } else { + error(s"Error removing Event Store for this app.") + 1 + } + + val dbInit = events.init(app.id) + val r2 = if (dbInit) { + info(s"Initialized Event Store for this app ID: ${app.id}.") + 0 + } else { + error(s"Unable to initialize Event Store for this appId: ${app.id}.") + 1 + } + info("Done.") + r1 + r2 + } else 1 + } + case _ => + info("Aborted.") + 0 + } + } getOrElse { + error(s"App ${ca.app.name} does not exist. Aborting.") + 1 + } + events.close() + status + } + + def channelNew(ca: ConsoleArgs): Int = { + val apps = storage.Storage.getMetaDataApps + val channels = storage.Storage.getMetaDataChannels + val events = storage.Storage.getLEvents() + val newChannel = ca.app.channel + val status = apps.getByName(ca.app.name) map { app => + val channelMap = channels.getByAppid(app.id).map(c => (c.name, c.id)).toMap + if (channelMap.contains(newChannel)) { + error(s"Unable to create new channel.") + error(s"Channel ${newChannel} already exists.") + 1 + } else if (!storage.Channel.isValidName(newChannel)) { + error(s"Unable to create new channel.") + error(s"The channel name ${newChannel} is invalid.") + error(s"${storage.Channel.nameConstraint}") + 1 + } else { + + val channelId = channels.insert(storage.Channel( + id = 0, // new id will be assigned + appid = app.id, + name = newChannel + )) + channelId.map { chanId => + info(s"Updated Channel meta-data.") + // initialize storage + val dbInit = events.init(app.id, Some(chanId)) + if (dbInit) { + info(s"Initialized Event Store for the channel: ${newChannel}.") + info(s"Created new channel:") + info(s" Channel Name: ${newChannel}") + info(s" Channel ID: ${chanId}") + info(s" App ID: ${app.id}") + 0 + } else { + error(s"Unable to create new channel.") + error(s"Failed to initalize Event Store.") + // reverted back the meta data + try { + channels.delete(chanId) + 0 + } catch { + case e: Exception => + error(s"Failed to revert back the Channel meta-data change.", e) + error(s"The channel ${newChannel} CANNOT be used!") + error(s"Please run 'pio app channel-delete ${app.name} ${newChannel}' " + + "to delete this channel!") + 1 + } + } + }.getOrElse { + error(s"Unable to create new channel.") + error(s"Failed to update Channel meta-data.") + 1 + } + } + } getOrElse { + error(s"App ${ca.app.name} does not exist. Aborting.") + 1 + } + events.close() + status + } + + def channelDelete(ca: ConsoleArgs): Int = { + val apps = storage.Storage.getMetaDataApps + val channels = storage.Storage.getMetaDataChannels + val events = storage.Storage.getLEvents() + val deleteChannel = ca.app.channel + val status = apps.getByName(ca.app.name) map { app => + val channelMap = channels.getByAppid(app.id).map(c => (c.name, c.id)).toMap + if (!channelMap.contains(deleteChannel)) { + error(s"Unable to delete channel.") + error(s"Channel ${deleteChannel} doesn't exist.") + 1 + } else { + info(s"The following channel will be deleted. Are you sure?") + info(s" Channel Name: ${deleteChannel}") + info(s" Channel ID: ${channelMap(deleteChannel)}") + info(s" App Name: ${app.name}") + info(s" App ID: ${app.id}") + val choice = if(ca.app.force) "YES" else readLine("Enter 'YES' to proceed: ") + choice match { + case "YES" => { + // NOTE: remove storage first before remove meta data (in case remove storage failed) + val dbRemoved = events.remove(app.id, Some(channelMap(deleteChannel))) + if (dbRemoved) { + info(s"Removed Event Store for this channel: ${deleteChannel}") + try { + channels.delete(channelMap(deleteChannel)) + info(s"Deleted channel: ${deleteChannel}.") + 0 + } catch { + case e: Exception => + error(s"Unable to delete channel.", e) + error(s"Failed to update Channel meta-data.") + error(s"The channel ${deleteChannel} CANNOT be used!") + error(s"Please run 'pio app channel-delete ${app.name} ${deleteChannel}' " + + "to delete this channel again!") + 1 + } + } else { + error(s"Unable to delete channel.") + error(s"Error removing Event Store for this channel.") + 1 + } + } + case _ => + info("Aborted.") + 0 + } + } + } getOrElse { + error(s"App ${ca.app.name} does not exist. Aborting.") + 1 + } + events.close() + status + } + +}
