[PIO-110] Refactoring Closes #425
Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/6789dbeb Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/6789dbeb Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/6789dbeb Branch: refs/heads/master Commit: 6789dbeb71b1cc7f13a385032da6fbc3b8cf7a12 Parents: 3856e5c Author: Naoki Takezoe <[email protected]> Authored: Mon Aug 28 19:31:58 2017 +0900 Committer: Naoki Takezoe <[email protected]> Committed: Mon Aug 28 19:31:58 2017 +0900 ---------------------------------------------------------------------- .../authentication/KeyAuthentication.scala | 4 +- .../apache/predictionio/controller/Engine.scala | 41 ++++++----- .../predictionio/controller/EngineParams.scala | 4 +- .../controller/FastEvalEngine.scala | 4 +- .../controller/IdentityPreparator.scala | 2 - .../predictionio/controller/LAlgorithm.scala | 15 ++-- .../controller/MetricEvaluator.scala | 10 +-- .../predictionio/controller/P2LAlgorithm.scala | 15 ++-- .../predictionio/controller/PAlgorithm.scala | 15 ++-- .../core/SelfCleaningDataSource.scala | 24 +++---- .../predictionio/workflow/BatchPredict.scala | 11 ++- .../predictionio/workflow/CreateServer.scala | 24 +++---- .../predictionio/workflow/CreateWorkflow.scala | 8 +-- .../workflow/EngineServerPluginContext.scala | 4 +- .../predictionio/workflow/WorkflowContext.scala | 4 +- .../predictionio/workflow/WorkflowUtils.scala | 17 ++--- .../predictionio/workflow/index.scala.html | 14 ++-- .../predictionio/controller/EngineTest.scala | 2 +- .../apache/predictionio/data/api/Common.scala | 1 - .../predictionio/data/api/EventServer.scala | 4 +- .../data/api/EventServerPluginContext.scala | 4 +- .../apache/predictionio/data/api/Stats.scala | 2 +- .../apache/predictionio/data/api/Webhooks.scala | 27 ++------ .../data/storage/EngineInstances.scala | 4 +- .../data/storage/EvaluationInstances.scala | 4 +- .../data/storage/EventJson4sSupport.scala | 3 +- .../data/storage/PEventAggregator.scala | 2 - .../predictionio/data/storage/Storage.scala | 4 +- .../predictionio/data/view/LBatchView.scala | 1 - .../predictionio/data/view/QuickTest.scala | 4 -- .../e2/engine/BinaryVectorizer.scala | 1 - .../e2/engine/CategoricalNaiveBayes.scala | 1 - .../predictionio/e2/engine/MarkovChain.scala | 1 - .../data/storage/elasticsearch/ESApps.scala | 2 +- .../storage/elasticsearch/ESEventsUtil.scala | 2 +- .../data/storage/elasticsearch/ESLEvents.scala | 30 +------- .../data/storage/elasticsearch/ESPEvents.scala | 3 +- .../data/storage/elasticsearch/ESUtils.scala | 15 ++-- .../storage/elasticsearch/ESAccessKeys.scala | 2 +- .../data/storage/elasticsearch/ESApps.scala | 2 +- .../elasticsearch/ESEngineInstances.scala | 6 +- .../elasticsearch/ESEvaluationInstances.scala | 4 +- .../data/storage/hbase/HBEventsUtil.scala | 41 +++++------ .../data/storage/hbase/upgrade/HB_0_8_0.scala | 2 +- .../predictionio/tools/RunBatchPredict.scala | 4 +- .../apache/predictionio/tools/RunWorkflow.scala | 20 +++--- .../org/apache/predictionio/tools/Runner.scala | 2 +- .../tools/admin/CommandClient.scala | 4 +- .../predictionio/tools/commands/App.scala | 72 ++++++++++---------- .../tools/commands/Management.scala | 2 +- .../predictionio/tools/console/Console.scala | 8 +-- 51 files changed, 207 insertions(+), 295 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala ---------------------------------------------------------------------- diff --git a/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala b/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala index 3ebc0b4..fa950aa 100644 --- a/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala +++ b/common/src/main/scala/org/apache/predictionio/authentication/KeyAuthentication.scala @@ -49,12 +49,12 @@ trait KeyAuthentication { val passedKey = accessKeyParamOpt.getOrElse { Left(AuthenticationFailedRejection( - AuthenticationFailedRejection.CredentialsRejected, List())) + AuthenticationFailedRejection.CredentialsRejected, Nil)) } if (!ServerKey.authEnforced || passedKey.equals(ServerKey.get)) Right(ctx.request) else Left(AuthenticationFailedRejection( - AuthenticationFailedRejection.CredentialsRejected, List())) + AuthenticationFailedRejection.CredentialsRejected, Nil)) } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/controller/Engine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/Engine.scala b/core/src/main/scala/org/apache/predictionio/controller/Engine.scala index 436c542..1f9d0ab 100644 --- a/core/src/main/scala/org/apache/predictionio/controller/Engine.scala +++ b/core/src/main/scala/org/apache/predictionio/controller/Engine.scala @@ -38,7 +38,6 @@ import org.apache.predictionio.workflow.StopAfterReadInterruption import org.apache.predictionio.workflow.WorkflowParams import org.apache.predictionio.workflow.WorkflowUtils import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.json4s._ import org.json4s.native.JsonMethods._ @@ -255,13 +254,12 @@ class Engine[TD, EI, PD, Q, P, A]( s"Loaded model ${m.getClass.getName} for algorithm " + s"${algo.getClass.getName}") sc.stop - m } catch { case e: NullPointerException => logger.warn( s"Null model detected for algorithm ${algo.getClass.getName}") - m } + m } } // model match } @@ -692,7 +690,7 @@ object Engine { val models: Seq[Any] = algorithmList.map(_.trainBase(sc, pd)) if (!params.skipSanityCheck) { - models.foreach { model => { + models.foreach { model => model match { case sanityCheckable: SanityCheck => { logger.info(s"${model.getClass.getName} supports data sanity" + @@ -704,7 +702,7 @@ object Engine { " data sanity check. Skipping check.") } } - }} + } } logger.info("EngineWorkflow.train completed") @@ -758,49 +756,49 @@ object Engine { .mapValues(_._3) .mapValues{ _.zipWithUniqueId().map(_.swap) } - val preparedMap: Map[EX, PD] = evalTrainMap.mapValues { td => { + val preparedMap: Map[EX, PD] = evalTrainMap.mapValues { td => preparator.prepareBase(sc, td) - }} + } - val algoModelsMap: Map[EX, Map[AX, Any]] = preparedMap.mapValues { pd => { + val algoModelsMap: Map[EX, Map[AX, Any]] = preparedMap.mapValues { pd => algoMap.mapValues(_.trainBase(sc,pd)) - }} + } val suppQAsMap: Map[EX, RDD[(QX, (Q, A))]] = evalQAsMap.mapValues { qas => qas.map { case (qx, (q, a)) => (qx, (serving.supplementBase(q), a)) } } val algoPredictsMap: Map[EX, RDD[(QX, Seq[P])]] = (0 until evalCount) - .map { ex => { + .map { ex => val modelMap: Map[AX, Any] = algoModelsMap(ex) val qs: RDD[(QX, Q)] = suppQAsMap(ex).mapValues(_._1) val algoPredicts: Seq[RDD[(QX, (AX, P))]] = (0 until algoCount) - .map { ax => { + .map { ax => val algo = algoMap(ax) val model = modelMap(ax) val rawPredicts: RDD[(QX, P)] = algo.batchPredictBase(sc, model, qs) - val predicts: RDD[(QX, (AX, P))] = rawPredicts.map { case (qx, p) => { + val predicts: RDD[(QX, (AX, P))] = rawPredicts.map { case (qx, p) => (qx, (ax, p)) - }} + } predicts - }} + } val unionAlgoPredicts: RDD[(QX, Seq[P])] = sc.union(algoPredicts) .groupByKey() - .mapValues { ps => { + .mapValues { ps => assert (ps.size == algoCount, "Must have same length as algoCount") // TODO. Check size == algoCount ps.toSeq.sortBy(_._1).map(_._2) - }} + } (ex, unionAlgoPredicts) - }} + } .toMap val servingQPAMap: Map[EX, RDD[(Q, P, A)]] = algoPredictsMap - .map { case (ex, psMap) => { + .map { case (ex, psMap) => // The query passed to serving.serve is the original one, not // supplemented. val qasMap: RDD[(QX, (Q, A))] = evalQAsMap(ex) @@ -811,12 +809,11 @@ object Engine { case (qx, q, ps, a) => (q, serving.serveBase(q, ps), a) } (ex, qpaMap) - }} + } - (0 until evalCount).map { ex => { + (0 until evalCount).map { ex => (evalInfoMap(ex), servingQPAMap(ex)) - }} - .toSeq + } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala b/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala index 6dccd4a..8068eaa 100644 --- a/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala +++ b/core/src/main/scala/org/apache/predictionio/controller/EngineParams.scala @@ -35,7 +35,7 @@ import scala.language.implicitConversions class EngineParams( val dataSourceParams: (String, Params) = ("", EmptyParams()), val preparatorParams: (String, Params) = ("", EmptyParams()), - val algorithmParamsList: Seq[(String, Params)] = Seq(), + val algorithmParamsList: Seq[(String, Params)] = Nil, val servingParams: (String, Params) = ("", EmptyParams())) extends Serializable { @@ -102,7 +102,7 @@ object EngineParams { dataSourceParams: Params = EmptyParams(), preparatorName: String = "", preparatorParams: Params = EmptyParams(), - algorithmParamsList: Seq[(String, Params)] = Seq(), + algorithmParamsList: Seq[(String, Params)] = Nil, servingName: String = "", servingParams: Params = EmptyParams()): EngineParams = { new EngineParams( http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala b/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala index e046b62..d128776 100644 --- a/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala +++ b/core/src/main/scala/org/apache/predictionio/controller/FastEvalEngine.scala @@ -270,10 +270,10 @@ object FastEvalEngineWorkflow { workflow: FastEvalEngineWorkflow[TD, EI, PD, Q, P, A], engineParamsList: Seq[EngineParams]) : Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])] = { - engineParamsList.map { engineParams => { + engineParamsList.map { engineParams => (engineParams, getServingResult(workflow, new ServingPrefix(engineParams))) - }} + } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala b/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala index a82f493..8256142 100644 --- a/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala +++ b/core/src/main/scala/org/apache/predictionio/controller/IdentityPreparator.scala @@ -22,8 +22,6 @@ import org.apache.predictionio.core.BaseDataSource import org.apache.predictionio.core.BasePreparator import org.apache.spark.SparkContext -import scala.reflect._ - /** A helper concrete implementation of [[org.apache.predictionio.core.BasePreparator]] * that passes training data through without any special preparation. This can * be used in place for both [[PPreparator]] and [[LPreparator]]. http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala index 9e973e4..27d1d14 100644 --- a/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala +++ b/core/src/main/scala/org/apache/predictionio/controller/LAlgorithm.scala @@ -119,15 +119,12 @@ abstract class LAlgorithm[PD, M : ClassTag, Q, P] bm: Any): Any = { // Check RDD[M].count == 1 val m = bm.asInstanceOf[RDD[M]].first() - if (m.isInstanceOf[PersistentModel[_]]) { - if (m.asInstanceOf[PersistentModel[Params]].save( - modelId, algoParams, sc)) { - PersistentModelManifest(className = m.getClass.getName) - } else { - () - } - } else { - m + m match { + case m: PersistentModel[Params] @unchecked => + if(m.save(modelId, algoParams, sc)){ + PersistentModelManifest(className = m.getClass.getName) + } else () + case _ => m } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/controller/MetricEvaluator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/MetricEvaluator.scala b/core/src/main/scala/org/apache/predictionio/controller/MetricEvaluator.scala index 73ecbe4..fc5ec15 100644 --- a/core/src/main/scala/org/apache/predictionio/controller/MetricEvaluator.scala +++ b/core/src/main/scala/org/apache/predictionio/controller/MetricEvaluator.scala @@ -222,9 +222,8 @@ class MetricEvaluator[EI, Q, P, A, R] ( params: WorkflowParams): MetricEvaluatorResult[R] = { val evalResultList: Seq[(EngineParams, MetricScores[R])] = engineEvalDataSet - .zipWithIndex .par - .map { case ((engineParams, evalDataSet), idx) => + .map { case (engineParams, evalDataSet) => val metricScores = MetricScores[R]( metric.calculate(sc, evalDataSet), otherMetrics.map(_.calculate(sc, evalDataSet))) @@ -235,15 +234,16 @@ class MetricEvaluator[EI, Q, P, A, R] ( implicit lazy val formats = Utils.json4sDefaultFormats + new NameParamsSerializer - evalResultList.zipWithIndex.foreach { case ((ep, r), idx) => + val evalResultListWithIndex = evalResultList.zipWithIndex + + evalResultListWithIndex.foreach { case ((ep, r), idx) => logger.info(s"Iteration $idx") logger.info(s"EngineParams: ${JsonExtractor.engineParamsToJson(Both, ep)}") logger.info(s"Result: $r") } // use max. take implicit from Metric. - val ((bestEngineParams, bestScore), bestIdx) = evalResultList - .zipWithIndex + val ((bestEngineParams, bestScore), bestIdx) = evalResultListWithIndex .reduce { (x, y) => if (metric.compare(x._1._2.score, y._1._2.score) >= 0) x else y } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala index ede8dc2..c617d2c 100644 --- a/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala +++ b/core/src/main/scala/org/apache/predictionio/controller/P2LAlgorithm.scala @@ -110,15 +110,12 @@ abstract class P2LAlgorithm[PD, M: ClassTag, Q: ClassTag, P] algoParams: Params, bm: Any): Any = { val m = bm.asInstanceOf[M] - if (m.isInstanceOf[PersistentModel[_]]) { - if (m.asInstanceOf[PersistentModel[Params]].save( - modelId, algoParams, sc)) { - PersistentModelManifest(className = m.getClass.getName) - } else { - () - } - } else { - m + m match { + case m: PersistentModel[Params] @unchecked => + if(m.save(modelId, algoParams, sc)){ + PersistentModelManifest(className = m.getClass.getName) + } else () + case _ => m } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala b/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala index 3419de3..55f8363 100644 --- a/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala +++ b/core/src/main/scala/org/apache/predictionio/controller/PAlgorithm.scala @@ -115,15 +115,12 @@ abstract class PAlgorithm[PD, M, Q, P] algoParams: Params, bm: Any): Any = { val m = bm.asInstanceOf[M] - if (m.isInstanceOf[PersistentModel[_]]) { - if (m.asInstanceOf[PersistentModel[Params]].save( - modelId, algoParams, sc)) { - PersistentModelManifest(className = m.getClass.getName) - } else { - () - } - } else { - () + m match { + case m: PersistentModel[Params] @unchecked => + if(m.save(modelId, algoParams, sc)){ + PersistentModelManifest(className = m.getClass.getName) + } else () + case _ => () } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala b/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala index 3520d80..cadf6b8 100644 --- a/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala +++ b/core/src/main/scala/org/apache/predictionio/core/SelfCleaningDataSource.scala @@ -93,7 +93,6 @@ trait SelfCleaningDataSource { */ @DeveloperApi def getCleanedLEvents(lEvents: Iterable[Event]): Iterable[Event] = { - eventWindow .flatMap(_.duration) .map { duration => @@ -101,7 +100,7 @@ trait SelfCleaningDataSource { lEvents.filter(e => e.eventTime.isAfter(DateTime.now().minus(fd.toMillis)) || isSetEvent(e) ) - }.getOrElse(lEvents).toIterable + }.getOrElse(lEvents) } def compressPProperties(sc: SparkContext, rdd: RDD[Event]): RDD[Event] = { @@ -117,7 +116,7 @@ trait SelfCleaningDataSource { } def compressLProperties(events: Iterable[Event]): Iterable[Event] = { - events.filter(isSetEvent).toIterable + events.filter(isSetEvent) .groupBy(_.entityType) .map { pair => val (_, ls) = pair @@ -164,7 +163,7 @@ trait SelfCleaningDataSource { val result = cleanPEvents(sc) val originalEvents = PEventStore.find(appName)(sc) val newEvents = result subtract originalEvents - val eventsToRemove = (originalEvents subtract result).map { case e => + val eventsToRemove = (originalEvents subtract result).map { e => e.eventId.getOrElse("") } @@ -187,7 +186,7 @@ trait SelfCleaningDataSource { def removeEvents(eventsToRemove: Set[String], appId: Int) { val listOfFuture: List[Future[Boolean]] = eventsToRemove - .filter(x => x != "").toList.map { case eventId => + .filter(x => x != "").toList.map { eventId => lEventsDb.futureDelete(eventId, appId) } @@ -202,9 +201,8 @@ trait SelfCleaningDataSource { /** Replace events in Event Store * - * @param events new events - * @param appId delete all events of appId - * @param channelId delete all events of channelId + * @param newEvents new events + * @param eventsToRemove event ids to remove */ def wipe( newEvents: Set[Event], @@ -212,7 +210,7 @@ trait SelfCleaningDataSource { ): Unit = { val (appId, channelId) = Common.appNameToId(appName, None) - val listOfFutureNewEvents: List[Future[String]] = newEvents.toList.map { case event => + val listOfFutureNewEvents: List[Future[String]] = newEvents.toList.map { event => lEventsDb.futureInsert(recreateEvent(event, None, event.eventTime), appId) } @@ -233,10 +231,10 @@ trait SelfCleaningDataSource { val rdd = eventWindow match { case Some(ew) => - var updated = + val updated = if (ew.compressProperties) compressPProperties(sc, pEvents) else pEvents - val deduped = if (ew.removeDuplicates) removePDuplicates(sc,updated) else updated + val deduped = if (ew.removeDuplicates) removePDuplicates(sc, updated) else updated deduped case None => pEvents @@ -258,7 +256,7 @@ trait SelfCleaningDataSource { val result = cleanLEvents().toSet val originalEvents = LEventStore.find(appName).toSet val newEvents = result -- originalEvents - val eventsToRemove = (originalEvents -- result).map { case e => + val eventsToRemove = (originalEvents -- result).map { e => e.eventId.getOrElse("") } @@ -278,7 +276,7 @@ trait SelfCleaningDataSource { val events = eventWindow match { case Some(ew) => - var updated = + val updated = if (ew.compressProperties) compressLProperties(lEvents) else lEvents val deduped = if (ew.removeDuplicates) removeLDuplicates(updated) else updated deduped http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala index 2fb0545..5420638 100644 --- a/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala +++ b/core/src/main/scala/org/apache/predictionio/workflow/BatchPredict.scala @@ -134,12 +134,11 @@ object BatchPredict extends Logging { val maybeEngine = engineFactory() // EngineFactory return a base engine, which may not be deployable. - if (!maybeEngine.isInstanceOf[Engine[_,_,_,_,_,_]]) { - throw new NoSuchMethodException( + maybeEngine match { + case e: Engine[_, _, _, _, _, _] => e + case _ => throw new NoSuchMethodException( s"Engine $maybeEngine cannot be used for batch predict") } - - maybeEngine.asInstanceOf[Engine[_,_,_,_,_,_]] } def run[Q, P]( @@ -207,8 +206,8 @@ object BatchPredict extends Logging { // finally Serving.serve. val supplementedQuery = serving.supplementBase(query) // TODO: Parallelize the following. - val predictions = algorithms.zipWithIndex.map { case (a, ai) => - a.predictBase(models(ai), supplementedQuery) + val predictions = algorithms.zip(models).map { case (a, m) => + a.predictBase(m, supplementedQuery) } // Notice that it is by design to call Serving.serve with the // *original* query. http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala b/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala index 8f0aed7..2447682 100644 --- a/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala +++ b/core/src/main/scala/org/apache/predictionio/workflow/CreateServer.scala @@ -18,7 +18,7 @@ package org.apache.predictionio.workflow -import java.io.{PrintWriter, Serializable, StringWriter} +import java.io.Serializable import java.util.concurrent.TimeUnit import akka.actor._ @@ -32,6 +32,7 @@ import com.twitter.chill.{KryoBase, KryoInjection, ScalaKryoInstantiator} import com.typesafe.config.ConfigFactory import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer import grizzled.slf4j.Logging +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.predictionio.authentication.KeyAuthentication import org.apache.predictionio.configuration.SSLConfiguration import org.apache.predictionio.controller.{Engine, Params, Utils, WithPrId} @@ -309,7 +310,7 @@ class MasterActor ( sprayHttpListener.map { l => log.info("Server is shutting down.") l ! Http.Unbind(5.seconds) - system.shutdown + system.shutdown() } getOrElse { log.warning("No active server is running.") } @@ -353,7 +354,7 @@ class MasterActor ( } } else { log.error("Bind failed. Shutting down.") - system.shutdown + system.shutdown() } } @@ -432,13 +433,6 @@ class ServerActor[Q, P]( } } - def getStackTraceString(e: Throwable): String = { - val writer = new StringWriter() - val printWriter = new PrintWriter(writer) - e.printStackTrace(printWriter) - writer.toString - } - val myRoute = path("") { get { @@ -492,8 +486,8 @@ class ServerActor[Q, P]( // finally Serving.serve. val supplementedQuery = serving.supplementBase(query) // TODO: Parallelize the following. - val predictions = algorithms.zipWithIndex.map { case (a, ai) => - a.predictBase(models(ai), supplementedQuery) + val predictions = algorithms.zip(models).map { case (a, m) => + a.predictBase(m, supplementedQuery) } // Notice that it is by design to call Serving.serve with the // *original* query. @@ -533,7 +527,7 @@ class ServerActor[Q, P]( case id: WithPrId => Map("prId" -> id.prId) case _ => - Map() + Map.empty } val data = Map( // "appId" -> dataSourceParams.asInstanceOf[ParamsWithAppId].appId, @@ -596,7 +590,7 @@ class ServerActor[Q, P]( } catch { case e: MappingException => val msg = s"Query:\n$queryString\n\nStack Trace:\n" + - s"${getStackTraceString(e)}\n\n" + s"${ExceptionUtils.getStackTrace(e)}\n\n" log.error(msg) args.logUrl map { url => remoteLog( @@ -607,7 +601,7 @@ class ServerActor[Q, P]( complete(StatusCodes.BadRequest, e.getMessage) case e: Throwable => val msg = s"Query:\n$queryString\n\nStack Trace:\n" + - s"${getStackTraceString(e)}\n\n" + s"${ExceptionUtils.getStackTrace(e)}\n\n" log.error(msg) args.logUrl map { url => remoteLog( http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala index 899ace2..303ed06 100644 --- a/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala +++ b/core/src/main/scala/org/apache/predictionio/workflow/CreateWorkflow.scala @@ -165,14 +165,14 @@ object CreateWorkflow extends Logging { } } - val pioEnvVars = wfc.env.map(e => - e.split(',').flatMap(p => + val pioEnvVars = wfc.env.map { e => + e.split(',').flatMap { p => p.split('=') match { case Array(k, v) => List(k -> v) case _ => Nil } - ).toMap - ).getOrElse(Map()) + }.toMap + }.getOrElse(Map.empty) if (evaluation.isEmpty) { val variantJson = parse(stringFromFile(wfc.engineVariant)) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/workflow/EngineServerPluginContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/workflow/EngineServerPluginContext.scala b/core/src/main/scala/org/apache/predictionio/workflow/EngineServerPluginContext.scala index 78d86ac..cfc83eb 100644 --- a/core/src/main/scala/org/apache/predictionio/workflow/EngineServerPluginContext.scala +++ b/core/src/main/scala/org/apache/predictionio/workflow/EngineServerPluginContext.scala @@ -41,9 +41,9 @@ class EngineServerPluginContext( val pluginParams: mutable.Map[String, JValue], val log: LoggingAdapter) { def outputBlockers: Map[String, EngineServerPlugin] = - plugins.getOrElse(EngineServerPlugin.outputBlocker, Map()).toMap + plugins.getOrElse(EngineServerPlugin.outputBlocker, Map.empty).toMap def outputSniffers: Map[String, EngineServerPlugin] = - plugins.getOrElse(EngineServerPlugin.outputSniffer, Map()).toMap + plugins.getOrElse(EngineServerPlugin.outputSniffer, Map.empty).toMap } object EngineServerPluginContext extends Logging { http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/workflow/WorkflowContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/workflow/WorkflowContext.scala b/core/src/main/scala/org/apache/predictionio/workflow/WorkflowContext.scala index 7bd9117..c1e6937 100644 --- a/core/src/main/scala/org/apache/predictionio/workflow/WorkflowContext.scala +++ b/core/src/main/scala/org/apache/predictionio/workflow/WorkflowContext.scala @@ -28,8 +28,8 @@ import scala.language.existentials object WorkflowContext extends Logging { def apply( batch: String = "", - executorEnv: Map[String, String] = Map(), - sparkEnv: Map[String, String] = Map(), + executorEnv: Map[String, String] = Map.empty, + sparkEnv: Map[String, String] = Map.empty, mode: String = "" ): SparkContext = { val conf = new SparkConf() http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala b/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala index 0e578be..9a75415 100644 --- a/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala +++ b/core/src/main/scala/org/apache/predictionio/workflow/WorkflowUtils.scala @@ -60,12 +60,11 @@ object WorkflowUtils extends Logging { engineObject.instance.asInstanceOf[EngineFactory] ) } catch { - case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try { + case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => ( EngineLanguage.Java, Class.forName(engine).newInstance.asInstanceOf[EngineFactory] ) - } } } @@ -80,12 +79,11 @@ object WorkflowUtils extends Logging { epgObject.instance.asInstanceOf[EngineParamsGenerator] ) } catch { - case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try { + case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => ( EngineLanguage.Java, Class.forName(epg).newInstance.asInstanceOf[EngineParamsGenerator] ) - } } } @@ -99,12 +97,11 @@ object WorkflowUtils extends Logging { evaluationObject.instance.asInstanceOf[Evaluation] ) } catch { - case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => try { + case e @ (_: NoSuchFieldException | _: ClassNotFoundException) => ( EngineLanguage.Java, Class.forName(evaluation).newInstance.asInstanceOf[Evaluation] ) - } } } @@ -265,9 +262,9 @@ object WorkflowUtils extends Logging { Seq(file.toURI) } else { warn(s"Environment variable $p is pointing to a nonexistent file $f. Ignoring.") - Seq.empty[URI] + Seq.empty } - } getOrElse Seq.empty[URI] + } getOrElse Seq.empty ) } @@ -325,8 +322,8 @@ object WorkflowUtils extends Logging { error("Arrays are not allowed in the sparkConf section of engine.js.") sys.exit(1) } - case JNothing => List() - case _ => List(List() -> jv.values.toString) + case JNothing => Nil + case _ => List(Nil -> jv.values.toString) } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html ---------------------------------------------------------------------- diff --git a/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html b/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html index 62fe5a7..5040796 100644 --- a/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html +++ b/core/src/main/twirl/org/apache/predictionio/workflow/index.scala.html @@ -75,13 +75,13 @@ <h2>Algorithms and Models</h2> <table class="table table-bordered table-striped"> <tr><th>#</th><th colspan="2">Information</th></tr> - @for(a <- algorithms.zip(algorithmsParams).zip(models).zipWithIndex) { - <tr> - <th rowspan="3">@{a._2 + 1}</th> - <th>Class</th><td>@{a._1._1._1}</td> - </tr> - <tr><th>Parameters</th><td>@{a._1._1._2}</td></tr> - <tr><th>Model</th><td>@{a._1._2}</td></tr> + @for((((algo, param), model), i) <- algorithms.zip(algorithmsParams).zip(models).zipWithIndex) { + <tr> + <th rowspan="3">@{i + 1}</th> + <th>Class</th><td>@{algo}</td> + </tr> + <tr><th>Parameters</th><td>@{param}</td></tr> + <tr><th>Model</th><td>@{model}</td></tr> } </table> <h2>Serving</h2> http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/core/src/test/scala/org/apache/predictionio/controller/EngineTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/predictionio/controller/EngineTest.scala b/core/src/test/scala/org/apache/predictionio/controller/EngineTest.scala index 94879a5..fb10c94 100644 --- a/core/src/test/scala/org/apache/predictionio/controller/EngineTest.scala +++ b/core/src/test/scala/org/apache/predictionio/controller/EngineTest.scala @@ -320,7 +320,7 @@ class EngineTrainSuite extends FunSuite with SharedSparkContext { sc, new PDataSource0(0), new PPreparator0(1), - Seq(), + Nil, defaultWorkflowParams ) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/api/Common.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Common.scala b/data/src/main/scala/org/apache/predictionio/data/api/Common.scala index a579add..60efea2 100644 --- a/data/src/main/scala/org/apache/predictionio/data/api/Common.scala +++ b/data/src/main/scala/org/apache/predictionio/data/api/Common.scala @@ -25,7 +25,6 @@ import spray.routing._ import spray.routing.Directives._ import spray.routing.Rejection import spray.http.StatusCodes -import spray.http.StatusCode import spray.httpx.Json4sSupport import org.json4s.Formats http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala index 75c2227..41dfefb 100644 --- a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala +++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala @@ -131,13 +131,13 @@ class EventServiceActor( private val FailedAuth = Left( AuthenticationFailedRejection( - AuthenticationFailedRejection.CredentialsRejected, List() + AuthenticationFailedRejection.CredentialsRejected, Nil ) ) private val MissedAuth = Left( AuthenticationFailedRejection( - AuthenticationFailedRejection.CredentialsMissing, List() + AuthenticationFailedRejection.CredentialsMissing, Nil ) ) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala index 36f3f73..cd14cc4 100644 --- a/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala +++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServerPluginContext.scala @@ -30,10 +30,10 @@ class EventServerPluginContext( val plugins: mutable.Map[String, mutable.Map[String, EventServerPlugin]], val log: LoggingAdapter) { def inputBlockers: Map[String, EventServerPlugin] = - plugins.getOrElse(EventServerPlugin.inputBlocker, Map()).toMap + plugins.getOrElse(EventServerPlugin.inputBlocker, Map.empty).toMap def inputSniffers: Map[String, EventServerPlugin] = - plugins.getOrElse(EventServerPlugin.inputSniffer, Map()).toMap + plugins.getOrElse(EventServerPlugin.inputSniffer, Map.empty).toMap } object EventServerPluginContext extends Logging { http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala b/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala index ba2b575..9bbbc2e 100644 --- a/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala +++ b/data/src/main/scala/org/apache/predictionio/data/api/Stats.scala @@ -67,7 +67,7 @@ class Stats(val startTime: DateTime) { m .toSeq .flatMap { case (k, v) => - if (k._1 == appId) { Seq(KV(k._2, v)) } else { Seq() } + if (k._1 == appId) { Seq(KV(k._2, v)) } else { Nil } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala b/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala index 87a4600..57be037 100644 --- a/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala +++ b/data/src/main/scala/org/apache/predictionio/data/api/Webhooks.scala @@ -18,22 +18,13 @@ package org.apache.predictionio.data.api -import org.apache.predictionio.data.webhooks.JsonConnector -import org.apache.predictionio.data.webhooks.FormConnector import org.apache.predictionio.data.webhooks.ConnectorUtil -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.EventJson4sSupport import org.apache.predictionio.data.storage.LEvents -import spray.routing._ -import spray.routing.Directives._ import spray.http.StatusCodes import spray.http.StatusCode import spray.http.FormData -import spray.httpx.Json4sSupport -import org.json4s.Formats -import org.json4s.DefaultFormats import org.json4s.JObject import akka.event.LoggingAdapter @@ -61,14 +52,13 @@ private[predictionio] object Webhooks { } } - eventFuture.flatMap { eventOpt => - if (eventOpt.isEmpty) { + eventFuture.flatMap { + case None => Future successful { val message = s"webhooks connection for ${web} is not supported." (StatusCodes.NotFound, Map("message" -> message)) } - } else { - val event = eventOpt.get + case Some(event) => val data = eventClient.futureInsert(event, appId, channelId).map { id => val result = (StatusCodes.Created, Map("eventId" -> s"${id}")) @@ -78,7 +68,6 @@ private[predictionio] object Webhooks { result } data - } } } @@ -114,14 +103,13 @@ private[predictionio] object Webhooks { } } - eventFuture.flatMap { eventOpt => - if (eventOpt.isEmpty) { - Future { + eventFuture.flatMap { + case None => + Future successful { val message = s"webhooks connection for ${web} is not supported." (StatusCodes.NotFound, Map("message" -> message)) } - } else { - val event = eventOpt.get + case Some(event) => val data = eventClient.futureInsert(event, appId, channelId).map { id => val result = (StatusCodes.Created, Map("eventId" -> s"${id}")) @@ -131,7 +119,6 @@ private[predictionio] object Webhooks { result } data - } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala index 7c3aad0..82d62f8 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/EngineInstances.scala @@ -118,8 +118,8 @@ class EngineInstanceSerializer engineVariant = "", engineFactory = "", batch = "", - env = Map(), - sparkConf = Map(), + env = Map.empty, + sparkConf = Map.empty, dataSourceParams = "", preparatorParams = "", algorithmsParams = "", http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala index a40adb3..5714fde 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/EvaluationInstances.scala @@ -47,8 +47,8 @@ case class EvaluationInstance( evaluationClass: String = "", engineParamsGeneratorClass: String = "", batch: String = "", - env: Map[String, String] = Map(), - sparkConf: Map[String, String] = Map(), + env: Map[String, String] = Map.empty, + sparkConf: Map[String, String] = Map.empty, evaluatorResults: String = "", evaluatorResultsHTML: String = "", evaluatorResultsJSON: String = "") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala b/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala index ba20c61..57f0472 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/EventJson4sSupport.scala @@ -56,7 +56,7 @@ object EventJson4sSupport { val targetEntityType = fields.getOpt[String]("targetEntityType") val targetEntityId = fields.getOpt[String]("targetEntityId") val properties = fields.getOrElse[Map[String, JValue]]( - "properties", Map()) + "properties", Map.empty) // default currentTime expressed as UTC timezone lazy val currentTime = DateTime.now(EventValidation.defaultTimeZone) val eventTime = fields.getOpt[String]("eventTime") @@ -70,7 +70,6 @@ object EventJson4sSupport { }.getOrElse(currentTime) // disable tags from API for now. - val tags = List() // val tags = fields.getOpt[Seq[String]]("tags").getOrElse(List()) val prId = fields.getOpt[String]("prId") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala b/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala index 8e58384..5cf8ffc 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/PEventAggregator.scala @@ -22,8 +22,6 @@ import org.joda.time.DateTime import org.json4s.JValue -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD // each JValue data associated with the time it is set http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala b/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala index fd05767..52442a6 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/Storage.scala @@ -18,8 +18,6 @@ package org.apache.predictionio.data.storage -import java.lang.reflect.InvocationTargetException - import grizzled.slf4j.Logging import org.apache.predictionio.annotation.DeveloperApi @@ -78,7 +76,7 @@ trait BaseStorageClient { case class StorageClientConfig( parallel: Boolean = false, // parallelized access (RDD)? test: Boolean = false, // test mode config - properties: Map[String, String] = Map()) + properties: Map[String, String] = Map.empty) /** :: DeveloperApi :: * Thrown when a StorageClient runs into an exceptional condition http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala b/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala index d9c53ef..4f531b6 100644 --- a/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala +++ b/data/src/main/scala/org/apache/predictionio/data/view/LBatchView.scala @@ -136,7 +136,6 @@ class EventSeq(val events: List[Event]) { events .groupBy( _.entityId ) .mapValues( _.sortBy(_.eventTime.getMillis).foldLeft[T](init)(op)) - .toMap } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala b/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala index 30f9c17..b6b509c 100644 --- a/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala +++ b/data/src/main/scala/org/apache/predictionio/data/view/QuickTest.scala @@ -18,10 +18,6 @@ package org.apache.predictionio.data.view -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.LEvents -import org.apache.predictionio.data.storage.EventValidation -import org.apache.predictionio.data.storage.DataMap import org.apache.predictionio.data.storage.Storage import scala.concurrent.ExecutionContext.Implicits.global // TODO http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/e2/src/main/scala/org/apache/predictionio/e2/engine/BinaryVectorizer.scala ---------------------------------------------------------------------- diff --git a/e2/src/main/scala/org/apache/predictionio/e2/engine/BinaryVectorizer.scala b/e2/src/main/scala/org/apache/predictionio/e2/engine/BinaryVectorizer.scala index 114c6b9..dae9074 100644 --- a/e2/src/main/scala/org/apache/predictionio/e2/engine/BinaryVectorizer.scala +++ b/e2/src/main/scala/org/apache/predictionio/e2/engine/BinaryVectorizer.scala @@ -18,7 +18,6 @@ package org.apache.predictionio.e2.engine import org.apache.spark.rdd.RDD -import org.apache.spark.SparkContext._ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.linalg.Vector import scala.collection.immutable.HashMap http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/e2/src/main/scala/org/apache/predictionio/e2/engine/CategoricalNaiveBayes.scala ---------------------------------------------------------------------- diff --git a/e2/src/main/scala/org/apache/predictionio/e2/engine/CategoricalNaiveBayes.scala b/e2/src/main/scala/org/apache/predictionio/e2/engine/CategoricalNaiveBayes.scala index 9dc6f9d..90b5267 100644 --- a/e2/src/main/scala/org/apache/predictionio/e2/engine/CategoricalNaiveBayes.scala +++ b/e2/src/main/scala/org/apache/predictionio/e2/engine/CategoricalNaiveBayes.scala @@ -17,7 +17,6 @@ package org.apache.predictionio.e2.engine -import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD /** Class for training a naive Bayes model with categorical variables */ http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/e2/src/main/scala/org/apache/predictionio/e2/engine/MarkovChain.scala ---------------------------------------------------------------------- diff --git a/e2/src/main/scala/org/apache/predictionio/e2/engine/MarkovChain.scala b/e2/src/main/scala/org/apache/predictionio/e2/engine/MarkovChain.scala index 3c3ac34..130c53a 100644 --- a/e2/src/main/scala/org/apache/predictionio/e2/engine/MarkovChain.scala +++ b/e2/src/main/scala/org/apache/predictionio/e2/engine/MarkovChain.scala @@ -17,7 +17,6 @@ package org.apache.predictionio.e2.engine -import org.apache.spark.SparkContext._ import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix import org.apache.spark.mllib.linalg.{SparseVector, Vectors} import org.apache.spark.rdd.RDD http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala index 0b319ab..6afed12 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala @@ -141,7 +141,7 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String) try { val json = ("query" -> - ("match_all" -> List.empty)) + ("match_all" -> Nil)) ESUtils.getAll[App](restClient, index, estype, compact(render(json))) } catch { case e: IOException => http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala index ec72a49..749ab49 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala @@ -80,7 +80,7 @@ object ESEventsUtil { targetEntityId = targetEntityId, properties = properties, eventTime = eventTime, - tags = Seq(), + tags = Nil, prId = prId, creationTime = creationTime ) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala index 6240059..6c0c4a7 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala @@ -28,7 +28,6 @@ import org.apache.http.util.EntityUtils import org.apache.predictionio.data.storage.Event import org.apache.predictionio.data.storage.LEvents import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.client.RestClient import org.joda.time.DateTime import org.json4s._ import org.json4s.JsonDSL._ @@ -36,7 +35,6 @@ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization.write import org.json4s.ext.JodaTimeSerializers import grizzled.slf4j.Logging -import org.elasticsearch.client.ResponseException import org.apache.http.message.BasicHeader class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: String) @@ -217,29 +215,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St } } - private def exists(restClient: RestClient, estype: String, id: Int): Boolean = { - try { - restClient.performRequest( - "GET", - s"/$index/$estype/$id", - Map.empty[String, String].asJava).getStatusLine.getStatusCode match { - case 200 => true - case _ => false - } - } catch { - case e: ResponseException => - e.getResponse.getStatusLine.getStatusCode match { - case 404 => false - case _ => - error(s"Failed to access to /$index/$estype/$id", e) - false - } - case e: IOException => - error(s"Failed to access to $index/$estype/$id", e) - false - } - } - override def futureGet( eventId: String, appId: Int, @@ -288,7 +263,8 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St val response = restClient.performRequest( "POST", s"/$index/$estype/_delete_by_query", - Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava) + Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava, + entity) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) val result = (jsonResponse \ "result").extract[String] result match { @@ -331,7 +307,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St } catch { case e: IOException => error(e.getMessage) - Iterator[Event]() + Iterator.empty } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala index b9ad8bb..9f0a188 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala @@ -120,7 +120,8 @@ class ESPEvents(client: ESClient, config: StorageClientConfig, index: String) val response = restClient.performRequest( "POST", s"/$index/$estype/_delete_by_query", - Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava) + Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava, + entity) val jsonResponse = parse(EntityUtils.toString(response.getEntity)) val result = (jsonResponse \ "result").extract[String] result match { http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala index 3f7b058..cd9aa53 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala @@ -44,16 +44,13 @@ object ESUtils { implicit formats: Formats): Event = { def getString(s: String): String = { (value \ s) match { - case x if x == JNothing => null + case JNothing => null case x => x.extract[String] } } def getOptString(s: String): Option[String] = { - getString(s) match { - case null => None - case x => Some(x) - } + Option(getString(s)) } val properties: DataMap = getOptString("properties") @@ -233,14 +230,14 @@ object ESUtils { targetEntityId: Option[Option[String]] = None, reversed: Option[Boolean] = None): String = { val mustQueries = Seq( - startTime.map(x => { + startTime.map { x => val v = formatUTCDateTime(x) s"""{"range":{"eventTime":{"gte":"${v}"}}}""" - }), - untilTime.map(x => { + }, + untilTime.map { x => val v = formatUTCDateTime(x) s"""{"range":{"eventTime":{"lt":"${v}"}}}""" - }), + }, entityType.map(x => s"""{"term":{"entityType":"${x}"}}"""), entityId.map(x => s"""{"term":{"entityId":"${x}"}}"""), targetEntityType.flatMap(xx => xx.map(x => s"""{"term":{"targetEntityType":"${x}"}}""")), http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala index 077168a..5e3abe2 100644 --- a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala +++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala @@ -95,7 +95,7 @@ class ESAccessKeys(client: Client, config: StorageClientConfig, index: String) } catch { case e: ElasticsearchException => error(e.getMessage) - Seq[AccessKey]() + Nil } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala index 3781a4b..270af0e 100644 --- a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala +++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala @@ -105,7 +105,7 @@ class ESApps(client: Client, config: StorageClientConfig, index: String) } catch { case e: ElasticsearchException => error(e.getMessage) - Seq[App]() + Nil } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala index 21690bf..2d6056b 100644 --- a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala +++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala @@ -107,7 +107,7 @@ class ESEngineInstances(client: Client, config: StorageClientConfig, index: Stri } catch { case e: ElasticsearchException => error(e.getMessage) - Seq() + Nil } } @@ -127,7 +127,7 @@ class ESEngineInstances(client: Client, config: StorageClientConfig, index: Stri } catch { case e: ElasticsearchException => error(e.getMessage) - Seq() + Nil } } @@ -150,7 +150,7 @@ class ESEngineInstances(client: Client, config: StorageClientConfig, index: Stri def delete(id: String): Unit = { try { - val response = client.prepareDelete(index, estype, id).get + client.prepareDelete(index, estype, id).get } catch { case e: ElasticsearchException => error(e.getMessage) } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala index 85bf820..68c5a74 100644 --- a/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala +++ b/storage/elasticsearch1/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala @@ -101,7 +101,7 @@ class ESEvaluationInstances(client: Client, config: StorageClientConfig, index: } catch { case e: ElasticsearchException => error(e.getMessage) - Seq() + Nil } } @@ -114,7 +114,7 @@ class ESEvaluationInstances(client: Client, config: StorageClientConfig, index: } catch { case e: ElasticsearchException => error(e.getMessage) - Seq() + Nil } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala ---------------------------------------------------------------------- diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala index 2cdb734..64487fb 100644 --- a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala +++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBEventsUtil.scala @@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.filter.FilterList -import org.apache.hadoop.hbase.filter.RegexStringComparator import org.apache.hadoop.hbase.filter.SingleColumnValueFilter import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp import org.apache.hadoop.hbase.filter.BinaryComparator @@ -275,7 +274,7 @@ object HBEventsUtil { targetEntityId = targetEntityId, properties = properties, eventTime = eventTime, - tags = Seq(), + tags = Nil, prId = prId, creationTime = creationTime ) @@ -375,34 +374,28 @@ object HBEventsUtil { } } - targetEntityType.foreach { tetOpt => - if (tetOpt.isEmpty) { + targetEntityType.foreach { + case None => val filter = createSkipRowIfColumnExistFilter("targetEntityType") filters.addFilter(filter) - } else { - tetOpt.foreach { tet => - val filter = createBinaryFilter( - "targetEntityType", Bytes.toBytes(tet)) - // the entire row will be skipped if the column is not found. - filter.setFilterIfMissing(true) - filters.addFilter(filter) - } - } + case Some(tet) => + val filter = createBinaryFilter( + "targetEntityType", Bytes.toBytes(tet)) + // the entire row will be skipped if the column is not found. + filter.setFilterIfMissing(true) + filters.addFilter(filter) } - targetEntityId.foreach { teidOpt => - if (teidOpt.isEmpty) { + targetEntityId.foreach { + case None => val filter = createSkipRowIfColumnExistFilter("targetEntityId") filters.addFilter(filter) - } else { - teidOpt.foreach { teid => - val filter = createBinaryFilter( - "targetEntityId", Bytes.toBytes(teid)) - // the entire row will be skipped if the column is not found. - filter.setFilterIfMissing(true) - filters.addFilter(filter) - } - } + case Some(teid) => + val filter = createBinaryFilter( + "targetEntityId", Bytes.toBytes(teid)) + // the entire row will be skipped if the column is not found. + filter.setFilterIfMissing(true) + filters.addFilter(filter) } if (!filters.getFilters().isEmpty) { http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala ---------------------------------------------------------------------- diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala index cc07fa4..795cf7e 100644 --- a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala +++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/upgrade/HB_0_8_0.scala @@ -185,7 +185,7 @@ object HB_0_8_0 { targetEntityId = targetEntityId, properties = properties, eventTime = new DateTime(rowKey.millis, eventTimeZone), - tags = Seq(), + tags = Nil, prId = prId, creationTime = creationTime ) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala b/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala index 35572c9..c76d203 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/RunBatchPredict.scala @@ -59,10 +59,10 @@ object RunBatchPredict extends Logging { "--engine-variant", batchPredictArgs.variantJson.getOrElse( new File(engineDirPath, "engine.json")).getCanonicalPath) ++ - (if (batchPredictArgs.queryPartitions.isEmpty) Seq() + (if (batchPredictArgs.queryPartitions.isEmpty) Nil else Seq("--query-partitions", batchPredictArgs.queryPartitions.get.toString)) ++ - (if (verbose) Seq("--verbose") else Seq()) ++ + (if (verbose) Seq("--verbose") else Nil) ++ Seq("--json-extractor", batchPredictArgs.jsonExtractor.toString) Runner.runOnSpark( http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala index 6df0750..a25f4e0 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/RunWorkflow.scala @@ -64,25 +64,25 @@ object RunWorkflow extends Logging { "--verbosity", wa.verbosity.toString) ++ wa.engineFactory.map( - x => Seq("--engine-factory", x)).getOrElse(Seq()) ++ + x => Seq("--engine-factory", x)).getOrElse(Nil) ++ wa.engineParamsKey.map( - x => Seq("--engine-params-key", x)).getOrElse(Seq()) ++ - (if (wa.batch != "") Seq("--batch", wa.batch) else Seq()) ++ - (if (verbose) Seq("--verbose") else Seq()) ++ - (if (wa.skipSanityCheck) Seq("--skip-sanity-check") else Seq()) ++ - (if (wa.stopAfterRead) Seq("--stop-after-read") else Seq()) ++ + x => Seq("--engine-params-key", x)).getOrElse(Nil) ++ + (if (wa.batch != "") Seq("--batch", wa.batch) else Nil) ++ + (if (verbose) Seq("--verbose") else Nil) ++ + (if (wa.skipSanityCheck) Seq("--skip-sanity-check") else Nil) ++ + (if (wa.stopAfterRead) Seq("--stop-after-read") else Nil) ++ (if (wa.stopAfterPrepare) { Seq("--stop-after-prepare") } else { - Seq() + Nil }) ++ wa.evaluation.map(x => Seq("--evaluation-class", x)). - getOrElse(Seq()) ++ + getOrElse(Nil) ++ // If engineParamsGenerator is specified, it overrides the evaluation. wa.engineParamsGenerator.orElse(wa.evaluation) .map(x => Seq("--engine-params-generator-class", x)) - .getOrElse(Seq()) ++ - (if (wa.batch != "") Seq("--batch", wa.batch) else Seq()) ++ + .getOrElse(Nil) ++ + (if (wa.batch != "") Seq("--batch", wa.batch) else Nil) ++ Seq("--json-extractor", wa.jsonExtractor.toString) Runner.runOnSpark( http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala b/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala index 4a721be..4e266c8 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/Runner.scala @@ -30,7 +30,7 @@ import scala.sys.process._ case class SparkArgs( sparkHome: Option[String] = None, - sparkPassThrough: Seq[String] = Seq(), + sparkPassThrough: Seq[String] = Nil, sparkKryo: Boolean = false, scratchUri: Option[URI] = None) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala b/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala index 94a1a03..710c23a 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/admin/CommandClient.scala @@ -84,11 +84,11 @@ class CommandClient( val accessKey = AccessKey( key = "", appid = id, - events = Seq()) + events = Nil) val accessKey2 = accessKeyClient.insert(AccessKey( key = "", appid = id, - events = Seq())) + events = Nil)) accessKey2 map { k => new AppNewResponse(1,"App created successfully.",id, req.name, k) } getOrElse { http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/tools/src/main/scala/org/apache/predictionio/tools/commands/App.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/commands/App.scala b/tools/src/main/scala/org/apache/predictionio/tools/commands/App.scala index 44fa667..5884ebd 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/commands/App.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/App.scala @@ -67,7 +67,7 @@ object App extends EitherLogging { val newKey = storage.AccessKey( key = accessKey, appid = id, - events = Seq()) + events = Nil) accessKeys.insert(newKey) .map { k => Right(AppDescription( @@ -85,7 +85,7 @@ object App extends EitherLogging { errStr += s""" |Failed to revert back the App meta-data change. |The app ${name} CANNOT be used! - |Please run 'pio app delete ${name}' to delete this app!""" + |Please run 'pio app delete ${name}' to delete this app!""".stripMargin } logAndFail(errStr) } @@ -209,12 +209,12 @@ object App extends EitherLogging { channels.find(ch => ch.name == chName) match { case None => return logAndFail(s"""Unable to delete data for channel. - |Channel ${chName} doesn't exist.""") + |Channel ${chName} doesn't exist.""".stripMargin) case Some(ch) => Seq(Some(ch.id)) } - } getOrElse { - Seq(None) // for default channel - } + } getOrElse { + Seq(None) // for default channel + } } chanIdsToRemove.map { chId: Option[Int] => @@ -246,8 +246,7 @@ object App extends EitherLogging { errStr = if (chId.isDefined) { s"Unable to initialize Event Store for the channel ID: ${chId.get}." - } - else { + } else { s"Unable to initialize Event tore for the app ID: ${appDesc.app.id}." } error(errStr) @@ -272,11 +271,11 @@ object App extends EitherLogging { show(appName).right flatMap { case (appDesc: AppDescription, channels: Seq[Channel]) => if (channels.find(ch => ch.name == newChannel).isDefined) { logAndFail(s"""Channel ${newChannel} already exists. - |Unable to create new channel.""") + |Unable to create new channel.""".stripMargin) } else if (!storage.Channel.isValidName(newChannel)) { logAndFail(s"""Unable to create new channel. |The channel name ${newChannel} is invalid. - |${storage.Channel.nameConstraint}""") + |${storage.Channel.nameConstraint}""".stripMargin) } else { val channel = Channel( @@ -299,7 +298,7 @@ object App extends EitherLogging { Right(channel.copy(id = chanId)) } else { errStr = s"""Unable to create new channel. - |Failed to initalize Event Store.""" + |Failed to initalize Event Store.""".stripMargin error(errStr) // reverted back the meta data try { @@ -307,17 +306,17 @@ object App extends EitherLogging { Left(errStr) } catch { case e: Exception => - val nextErrStr = s""" + val nextErrStr = (s""" |Failed to revert back the Channel meta-data change. |The channel ${newChannel} CANNOT be used! |Please run 'pio app channel-delete ${appName} ${newChannel}'""" + - " to delete this channel!" + " to delete this channel!").stripMargin logAndFail(errStr + nextErrStr) } } } getOrElse { logAndFail(s"""Unable to create new channel. - |Failed to update Channel meta-data.""") + |Failed to update Channel meta-data.""".stripMargin) } } } @@ -329,33 +328,32 @@ object App extends EitherLogging { def channelDelete(appName: String, deleteChannel: String): MaybeError = { val chanStorage = storage.Storage.getMetaDataChannels val events = storage.Storage.getLEvents() - var errStr = "" try { show(appName).right.flatMap { case (appDesc: AppDescription, channels: Seq[Channel]) => val foundChannel = channels.find(ch => ch.name == deleteChannel) - if (foundChannel.isEmpty) { - logAndFail(s"""Unable to delete channel - |Channel ${deleteChannel} doesn't exists.""") - } else { - val chId = foundChannel.get.id - val dbRemoved = events.remove(appDesc.app.id, Some(chId)) - if (dbRemoved) { - info(s"Removed Event Store for this channel: ${deleteChannel}") - try { - chanStorage.delete(chId) - logAndSucceed(s"Deleted channel: ${deleteChannel}.") - } catch { - case e: Exception => - logAndFail(s"""Unable to delete channel. - |Failed to update Channel meta-data. - |The channel ${deleteChannel} CANNOT be used! - |Please run 'pio app channel-delete ${appDesc.app.name} ${deleteChannel}'""" + - " to delete this channel again!") + foundChannel match { + case None => + logAndFail(s"""Unable to delete channel + |Channel ${deleteChannel} doesn't exists.""".stripMargin) + case Some(channel) => + val dbRemoved = events.remove(appDesc.app.id, Some(channel.id)) + if (dbRemoved) { + info(s"Removed Event Store for this channel: ${deleteChannel}") + try { + chanStorage.delete(channel.id) + logAndSucceed(s"Deleted channel: ${deleteChannel}.") + } catch { + case e: Exception => + logAndFail((s"""Unable to delete channel. + |Failed to update Channel meta-data. + |The channel ${deleteChannel} CANNOT be used! + |Please run 'pio app channel-delete ${appDesc.app.name} ${deleteChannel}'""" + + " to delete this channel again!").stripMargin) + } + } else { + logAndFail(s"""Unable to delete channel. + |Error removing Event Store for this channel.""".stripMargin) } - } else { - logAndFail(s"""Unable to delete channel. - |Error removing Event Store for this channel.""") - } } } } finally { http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala b/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala index 30c249b..ee8cd50 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/commands/Management.scala @@ -49,7 +49,7 @@ case class PioStatus( sparkHome: String = "", sparkVersion: String = "", sparkMinVersion: String = "", - warnings: Seq[String] = Seq()) + warnings: Seq[String] = Nil) object Management extends EitherLogging { http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/6789dbeb/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala ---------------------------------------------------------------------- diff --git a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala index 4a72635..acd7598 100644 --- a/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala +++ b/tools/src/main/scala/org/apache/predictionio/tools/console/Console.scala @@ -49,12 +49,12 @@ case class ConsoleArgs( dashboard: DashboardArgs = DashboardArgs(), export: ExportArgs = ExportArgs(), imprt: ImportArgs = ImportArgs(), - commands: Seq[String] = Seq(), + commands: Seq[String] = Nil, metricsParamsJsonPath: Option[String] = None, paramsPath: String = "params", engineInstanceId: Option[String] = None, mainClass: Option[String] = None, - driverPassThrough: Seq[String] = Seq(), + driverPassThrough: Seq[String] = Nil, pioHome: Option[String] = None, verbose: Boolean = false) @@ -69,7 +69,7 @@ case class AppArgs( case class AccessKeyArgs( accessKey: String = "", - events: Seq[String] = Seq()) + events: Seq[String] = Nil) case class EngineInfo( engineId: String, @@ -763,7 +763,7 @@ object Console extends Logging { } } - def help(commands: Seq[String] = Seq()): String = { + def help(commands: Seq[String] = Nil): String = { if (commands.isEmpty) { mainHelp } else {
