http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/Utils.scala b/core/src/main/scala/io/prediction/controller/Utils.scala deleted file mode 100644 index 5098fba..0000000 --- a/core/src/main/scala/io/prediction/controller/Utils.scala +++ /dev/null @@ -1,69 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller - -import io.prediction.workflow.KryoInstantiator - -import org.json4s._ -import org.json4s.ext.JodaTimeSerializers - -import scala.io.Source - -import _root_.java.io.File -import _root_.java.io.FileOutputStream - -/** Controller utilities. - * - * @group Helper - */ -object Utils { - /** Default JSON4S serializers for PredictionIO controllers. */ - val json4sDefaultFormats = DefaultFormats.lossless ++ JodaTimeSerializers.all - - /** Save a model object as a file to a temporary location on local filesystem. - * It will first try to use the location indicated by the environmental - * variable PIO_FS_TMPDIR, then fall back to the java.io.tmpdir property. - * - * @param id Used as the filename of the file. - * @param model Model object. - */ - def save(id: String, model: Any): Unit = { - val tmpdir = sys.env.getOrElse("PIO_FS_TMPDIR", System.getProperty("java.io.tmpdir")) - val modelFile = tmpdir + File.separator + id - (new File(tmpdir)).mkdirs - val fos = new FileOutputStream(modelFile) - val kryo = KryoInstantiator.newKryoInjection - fos.write(kryo(model)) - fos.close - } - - /** Load a model object from a file in a temporary location on local - * filesystem. It will first try to use the location indicated by the - * environmental variable PIO_FS_TMPDIR, then fall back to the java.io.tmpdir - * property. - * - * @param id Used as the filename of the file. - */ - def load(id: String): Any = { - val tmpdir = sys.env.getOrElse("PIO_FS_TMPDIR", System.getProperty("java.io.tmpdir")) - val modelFile = tmpdir + File.separator + id - val src = Source.fromFile(modelFile)(scala.io.Codec.ISO8859) - val kryo = KryoInstantiator.newKryoInjection - val m = kryo.invert(src.map(_.toByte).toArray).get - src.close - m - } -}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/JavaEngineParamsGenerator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/java/JavaEngineParamsGenerator.scala b/core/src/main/scala/io/prediction/controller/java/JavaEngineParamsGenerator.scala deleted file mode 100644 index f932012..0000000 --- a/core/src/main/scala/io/prediction/controller/java/JavaEngineParamsGenerator.scala +++ /dev/null @@ -1,39 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller.java - -import io.prediction.controller.EngineParams -import io.prediction.controller.EngineParamsGenerator - -import scala.collection.JavaConversions.asScalaBuffer - -/** Define an engine parameter generator in Java - * - * Implementations of this abstract class can be supplied to "pio eval" as the second - * command line argument. - * - * @group Evaluation - */ -abstract class JavaEngineParamsGenerator extends EngineParamsGenerator { - - /** Set the list of [[EngineParams]]. - * - * @param engineParams A list of engine params - */ - def setEngineParamsList(engineParams: java.util.List[_ <: EngineParams]) { - engineParamsList = asScalaBuffer(engineParams) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/JavaEvaluation.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/java/JavaEvaluation.scala b/core/src/main/scala/io/prediction/controller/java/JavaEvaluation.scala deleted file mode 100644 index 3db89bf..0000000 --- a/core/src/main/scala/io/prediction/controller/java/JavaEvaluation.scala +++ /dev/null @@ -1,66 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller.java - -import io.prediction.controller.Evaluation -import io.prediction.controller.Metric -import io.prediction.core.BaseEngine - -import scala.collection.JavaConversions.asScalaBuffer - -/** Define an evaluation in Java. - * - * Implementations of this abstract class can be supplied to "pio eval" as the first - * argument. - * - * @group Evaluation - */ - -abstract class JavaEvaluation extends Evaluation { - /** Set the [[BaseEngine]] and [[Metric]] for this [[Evaluation]] - * - * @param baseEngine [[BaseEngine]] for this [[JavaEvaluation]] - * @param metric [[Metric]] for this [[JavaEvaluation]] - * @tparam EI Evaluation information class - * @tparam Q Query class - * @tparam P Predicted result class - * @tparam A Actual result class - */ - def setEngineMetric[EI, Q, P, A]( - baseEngine: BaseEngine[EI, Q, P, A], - metric: Metric[EI, Q, P, A, _]) { - - engineMetric = (baseEngine, metric) - } - - /** Set the [[BaseEngine]] and [[Metric]]s for this [[JavaEvaluation]] - * - * @param baseEngine [[BaseEngine]] for this [[JavaEvaluation]] - * @param metric [[Metric]] for this [[JavaEvaluation]] - * @param metrics Other [[Metric]]s for this [[JavaEvaluation]] - * @tparam EI Evaluation information class - * @tparam Q Query class - * @tparam P Predicted result class - * @tparam A Actual result class - */ - def setEngineMetrics[EI, Q, P, A]( - baseEngine: BaseEngine[EI, Q, P, A], - metric: Metric[EI, Q, P, A, _], - metrics: java.util.List[_ <: Metric[EI, Q, P, A, _]]) { - - engineMetrics = (baseEngine, metric, asScalaBuffer(metrics)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/LJavaAlgorithm.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/java/LJavaAlgorithm.scala b/core/src/main/scala/io/prediction/controller/java/LJavaAlgorithm.scala deleted file mode 100644 index ba6ed2d..0000000 --- a/core/src/main/scala/io/prediction/controller/java/LJavaAlgorithm.scala +++ /dev/null @@ -1,31 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller.java - -import io.prediction.controller.LAlgorithm - -import scala.reflect.ClassTag - -/** Base class of a Java local algorithm. Refer to [[LAlgorithm]] for documentation. - * - * @tparam PD Prepared data class. - * @tparam M Trained model class. - * @tparam Q Input query class. - * @tparam P Output prediction class. - * @group Algorithm - */ -abstract class LJavaAlgorithm[PD, M, Q, P] - extends LAlgorithm[PD, M, Q, P]()(ClassTag.AnyRef.asInstanceOf[ClassTag[M]]) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/LJavaDataSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/java/LJavaDataSource.scala b/core/src/main/scala/io/prediction/controller/java/LJavaDataSource.scala deleted file mode 100644 index dfafba4..0000000 --- a/core/src/main/scala/io/prediction/controller/java/LJavaDataSource.scala +++ /dev/null @@ -1,31 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller.java - -import io.prediction.controller.LDataSource - -import scala.reflect.ClassTag - -/** Base class of a Java local data source. Refer to [[LDataSource]] for documentation. - * - * @tparam TD Training data class. - * @tparam EI Evaluation Info class. - * @tparam Q Input query class. - * @tparam A Actual value class. - * @group Data Source - */ -abstract class LJavaDataSource[TD, EI, Q, A] - extends LDataSource[TD, EI, Q, A]()(ClassTag.AnyRef.asInstanceOf[ClassTag[TD]]) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/LJavaPreparator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/java/LJavaPreparator.scala b/core/src/main/scala/io/prediction/controller/java/LJavaPreparator.scala deleted file mode 100644 index 321a100..0000000 --- a/core/src/main/scala/io/prediction/controller/java/LJavaPreparator.scala +++ /dev/null @@ -1,29 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller.java - -import io.prediction.controller.LPreparator - -import scala.reflect.ClassTag - -/** Base class of a Java local preparator. Refer to [[LPreparator]] for documentation. - * - * @tparam TD Training data class. - * @tparam PD Prepared data class. - * @group Preparator - */ -abstract class LJavaPreparator[TD, PD] - extends LPreparator[TD, PD]()(ClassTag.AnyRef.asInstanceOf[ClassTag[PD]]) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/LJavaServing.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/java/LJavaServing.scala b/core/src/main/scala/io/prediction/controller/java/LJavaServing.scala deleted file mode 100644 index f664c38..0000000 --- a/core/src/main/scala/io/prediction/controller/java/LJavaServing.scala +++ /dev/null @@ -1,26 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller.java - -import io.prediction.controller.LServing - -/** Base class of Java local serving. Refer to [[LServing]] for documentation. - * - * @tparam Q Input query class. - * @tparam P Output prediction class. - * @group Serving - */ -abstract class LJavaServing[Q, P] extends LServing[Q, P] http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/P2LJavaAlgorithm.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/java/P2LJavaAlgorithm.scala b/core/src/main/scala/io/prediction/controller/java/P2LJavaAlgorithm.scala deleted file mode 100644 index fcf81a0..0000000 --- a/core/src/main/scala/io/prediction/controller/java/P2LJavaAlgorithm.scala +++ /dev/null @@ -1,33 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller.java - -import io.prediction.controller.P2LAlgorithm - -import scala.reflect.ClassTag - -/** Base class of a Java parallel-to-local algorithm. Refer to [[P2LAlgorithm]] for documentation. - * - * @tparam PD Prepared data class. - * @tparam M Trained model class. - * @tparam Q Input query class. - * @tparam P Output prediction class. - * @group Algorithm - */ -abstract class P2LJavaAlgorithm[PD, M, Q, P] - extends P2LAlgorithm[PD, M, Q, P]()( - ClassTag.AnyRef.asInstanceOf[ClassTag[M]], - ClassTag.AnyRef.asInstanceOf[ClassTag[Q]]) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/PJavaAlgorithm.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/java/PJavaAlgorithm.scala b/core/src/main/scala/io/prediction/controller/java/PJavaAlgorithm.scala deleted file mode 100644 index d3a370a..0000000 --- a/core/src/main/scala/io/prediction/controller/java/PJavaAlgorithm.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller.java - -import io.prediction.controller.PAlgorithm - -/** Base class of a Java parallel algorithm. Refer to [[PAlgorithm]] for documentation. - * - * @tparam PD Prepared data class. - * @tparam M Trained model class. - * @tparam Q Input query class. - * @tparam P Output prediction class. - * @group Algorithm - */ -abstract class PJavaAlgorithm[PD, M, Q, P] extends PAlgorithm[PD, M, Q, P] http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/PJavaDataSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/java/PJavaDataSource.scala b/core/src/main/scala/io/prediction/controller/java/PJavaDataSource.scala deleted file mode 100644 index 11b962d..0000000 --- a/core/src/main/scala/io/prediction/controller/java/PJavaDataSource.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller.java - -import io.prediction.controller.PDataSource - -/** Base class of a Java parallel data source. Refer to [[PDataSource]] for documentation. - * - * @tparam TD Training data class. - * @tparam EI Evaluation Info class. - * @tparam Q Input query class. - * @tparam A Actual value class. - * @group Data Source - */ -abstract class PJavaDataSource[TD, EI, Q, A] extends PDataSource[TD, EI, Q, A] http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/PJavaPreparator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/java/PJavaPreparator.scala b/core/src/main/scala/io/prediction/controller/java/PJavaPreparator.scala deleted file mode 100644 index 2a9c8f9..0000000 --- a/core/src/main/scala/io/prediction/controller/java/PJavaPreparator.scala +++ /dev/null @@ -1,26 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller.java - -import io.prediction.controller.PPreparator - -/** Base class of a Java parallel preparator. Refer to [[PPreparator]] for documentation - * - * @tparam TD Training data class. - * @tparam PD Prepared data class. - * @group Preparator - */ -abstract class PJavaPreparator[TD, PD] extends PPreparator[TD, PD] http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/SerializableComparator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/java/SerializableComparator.scala b/core/src/main/scala/io/prediction/controller/java/SerializableComparator.scala deleted file mode 100644 index 0e92f32..0000000 --- a/core/src/main/scala/io/prediction/controller/java/SerializableComparator.scala +++ /dev/null @@ -1,20 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.controller.java - -import java.util.Comparator - -trait SerializableComparator[T] extends Comparator[T] with java.io.Serializable http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/controller/package.scala b/core/src/main/scala/io/prediction/controller/package.scala deleted file mode 100644 index bcb4b0d..0000000 --- a/core/src/main/scala/io/prediction/controller/package.scala +++ /dev/null @@ -1,168 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction - -/** Provides building blocks for writing a complete prediction engine - * consisting of DataSource, Preparator, Algorithm, Serving, and Evaluation. - * - * == Start Building an Engine == - * The starting point of a prediction engine is the [[Engine]] class. - * - * == The DASE Paradigm == - * The building blocks together form the DASE paradigm. Learn more about DASE - * [[http://docs.prediction.io/customize/ here]]. - * - * == Types of Building Blocks == - * Depending on the problem you are solving, you would need to pick appropriate - * flavors of building blocks. - * - * === Engines === - * There are 3 typical engine configurations: - * - * 1. [[PDataSource]], [[PPreparator]], [[P2LAlgorithm]], [[LServing]] - * 2. [[PDataSource]], [[PPreparator]], [[PAlgorithm]], [[LServing]] - * 3. [[LDataSource]], [[LPreparator]], [[LAlgorithm]], [[LServing]] - * - * In both configurations 1 and 2, data is sourced and prepared in a - * parallelized fashion, with data type as RDD. - * - * The difference between configurations 1 and 2 come at the algorithm stage. - * In configuration 1, the algorithm operates on potentially large data as RDDs - * in the Spark cluster, and eventually outputs a model that is small enough to - * fit in a single machine. - * - * On the other hand, configuration 2 outputs a model that is potentially too - * large to fit in a single machine, and must reside in the Spark cluster as - * RDD(s). - * - * With configuration 1 ([[P2LAlgorithm]]), PredictionIO will automatically - * try to persist the model to local disk or HDFS if the model is serializable. - * - * With configuration 2 ([[PAlgorithm]]), PredictionIO will not automatically - * try to persist the model, unless the model implements the [[PersistentModel]] - * trait. - * - * In special circumstances where both the data and the model are small, - * configuration 3 may be used. Beware that RDDs cannot be used with - * configuration 3. - * - * === Data Source === - * [[PDataSource]] is probably the most used data source base class with the - * ability to process RDD-based data. [[LDataSource]] '''cannot''' handle - * RDD-based data. Use only when you have a special requirement. - * - * === Preparator === - * With [[PDataSource]], you must pick [[PPreparator]]. The same applies to - * [[LDataSource]] and [[LPreparator]]. - * - * === Algorithm === - * The workhorse of the engine comes in 3 different flavors. - * - * ==== P2LAlgorithm ==== - * Produces a model that is small enough to fit in a single machine from - * [[PDataSource]] and [[PPreparator]]. The model '''cannot''' contain any RDD. - * If the produced model is serializable, PredictionIO will try to - * automatically persist it. In addition, P2LAlgorithm.batchPredict is - * already implemented for [[Evaluation]] purpose. - * - * ==== PAlgorithm ==== - * Produces a model that could contain RDDs from [[PDataSource]] and - * [[PPreparator]]. PredictionIO will not try to persist it automatically - * unless the model implements [[PersistentModel]]. [[PAlgorithm.batchPredict]] - * must be implemented for [[Evaluation]]. - * - * ==== LAlgorithm ==== - * Produces a model that is small enough to fit in a single machine from - * [[LDataSource]] and [[LPreparator]]. The model '''cannot''' contain any RDD. - * If the produced model is serializable, PredictionIO will try to - * automatically persist it. In addition, LAlgorithm.batchPredict is - * already implemented for [[Evaluation]] purpose. - * - * === Serving === - * The serving component comes with only 1 flavor--[[LServing]]. At the serving - * stage, it is assumed that the result being served is already at a human- - * consumable size. - * - * == Model Persistence == - * PredictionIO tries its best to persist trained models automatically. Please - * refer to [[LAlgorithm.makePersistentModel]], - * [[P2LAlgorithm.makePersistentModel]], and [[PAlgorithm.makePersistentModel]] - * for descriptions on different strategies. - */ -package object controller { - - /** Base class of several helper types that represent emptiness - * - * @group Helper - */ - class SerializableClass() extends Serializable - - /** Empty data source parameters. - * @group Helper - */ - type EmptyDataSourceParams = EmptyParams - - /** Empty data parameters. - * @group Helper - */ - type EmptyDataParams = EmptyParams - - /** Empty evaluation info. - * @group Helper - */ - type EmptyEvaluationInfo = SerializableClass - - /** Empty preparator parameters. - * @group Helper - */ - type EmptyPreparatorParams = EmptyParams - - /** Empty algorithm parameters. - * @group Helper - */ - type EmptyAlgorithmParams = EmptyParams - - /** Empty serving parameters. - * @group Helper - */ - type EmptyServingParams = EmptyParams - - /** Empty metrics parameters. - * @group Helper - */ - type EmptyMetricsParams = EmptyParams - - /** Empty training data. - * @group Helper - */ - type EmptyTrainingData = SerializableClass - - /** Empty prepared data. - * @group Helper - */ - type EmptyPreparedData = SerializableClass - - /** Empty model. - * @group Helper - */ - type EmptyModel = SerializableClass - - /** Empty actual result. - * @group Helper - */ - type EmptyActualResult = SerializableClass - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/core/AbstractDoer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/core/AbstractDoer.scala b/core/src/main/scala/io/prediction/core/AbstractDoer.scala deleted file mode 100644 index 0635b27..0000000 --- a/core/src/main/scala/io/prediction/core/AbstractDoer.scala +++ /dev/null @@ -1,66 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.core - -import grizzled.slf4j.Logging -import io.prediction.annotation.DeveloperApi -import io.prediction.controller.Params - -/** :: DeveloperApi :: - * Base class for all controllers - */ -@DeveloperApi -abstract class AbstractDoer extends Serializable - -/** :: DeveloperApi :: - * Provides facility to instantiate controller classes - */ -@DeveloperApi -object Doer extends Logging { - /** :: DeveloperApi :: - * Instantiates a controller class using supplied controller parameters as - * constructor parameters - * - * @param cls Class of the controller class - * @param params Parameters of the controller class - * @tparam C Controller class - * @return An instance of the controller class - */ - @DeveloperApi - def apply[C <: AbstractDoer] ( - cls: Class[_ <: C], params: Params): C = { - - // Subclasses only allows two kind of constructors. - // 1. Constructor with P <: Params. - // 2. Emtpy constructor. - // First try (1), if failed, try (2). - try { - val constr = cls.getConstructor(params.getClass) - constr.newInstance(params) - } catch { - case e: NoSuchMethodException => try { - val zeroConstr = cls.getConstructor() - zeroConstr.newInstance() - } catch { - case e: NoSuchMethodException => - error(s"${params.getClass.getName} was used as the constructor " + - s"argument to ${e.getMessage}, but no constructor can handle it. " + - "Aborting.") - sys.exit(1) - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/core/BaseAlgorithm.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/core/BaseAlgorithm.scala b/core/src/main/scala/io/prediction/core/BaseAlgorithm.scala deleted file mode 100644 index a3d3fad..0000000 --- a/core/src/main/scala/io/prediction/core/BaseAlgorithm.scala +++ /dev/null @@ -1,123 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.core - -import com.google.gson.TypeAdapterFactory -import io.prediction.annotation.DeveloperApi -import io.prediction.controller.Params -import io.prediction.controller.Utils -import net.jodah.typetools.TypeResolver -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD - -/** :: DeveloperApi :: - * Base trait with default custom query serializer, exposed to engine developer - * via [[io.prediction.controller.CustomQuerySerializer]] - */ -@DeveloperApi -trait BaseQuerySerializer { - /** :: DeveloperApi :: - * Serializer for Scala query classes using - * [[io.prediction.controller.Utils.json4sDefaultFormats]] - */ - @DeveloperApi - @transient lazy val querySerializer = Utils.json4sDefaultFormats - - /** :: DeveloperApi :: - * Serializer for Java query classes using Gson - */ - @DeveloperApi - @transient lazy val gsonTypeAdapterFactories = Seq.empty[TypeAdapterFactory] -} - -/** :: DeveloperApi :: - * Base class of all algorithm controllers - * - * @tparam PD Prepared data class - * @tparam M Model class - * @tparam Q Query class - * @tparam P Predicted result class - */ -@DeveloperApi -abstract class BaseAlgorithm[PD, M, Q, P] - extends AbstractDoer with BaseQuerySerializer { - /** :: DeveloperApi :: - * Engine developers should not use this directly. This is called by workflow - * to train a model. - * - * @param sc Spark context - * @param pd Prepared data - * @return Trained model - */ - @DeveloperApi - def trainBase(sc: SparkContext, pd: PD): M - - /** :: DeveloperApi :: - * Engine developers should not use this directly. This is called by - * evaluation workflow to perform batch prediction. - * - * @param sc Spark context - * @param bm Model - * @param qs Batch of queries - * @return Batch of predicted results - */ - @DeveloperApi - def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)]) - : RDD[(Long, P)] - - /** :: DeveloperApi :: - * Engine developers should not use this directly. Called by serving to - * perform a single prediction. - * - * @param bm Model - * @param q Query - * @return Predicted result - */ - @DeveloperApi - def predictBase(bm: Any, q: Q): P - - /** :: DeveloperApi :: - * Engine developers should not use this directly. Prepare a model for - * persistence in the downstream consumer. PredictionIO supports 3 types of - * model persistence: automatic persistence, manual persistence, and - * re-training on deployment. This method provides a way for downstream - * modules to determine which mode the model should be persisted. - * - * @param sc Spark context - * @param modelId Model ID - * @param algoParams Algorithm parameters that trained this model - * @param bm Model - * @return The model itself for automatic persistence, an instance of - * [[io.prediction.workflow.PersistentModelManifest]] for manual - * persistence, or Unit for re-training on deployment - */ - @DeveloperApi - def makePersistentModel( - sc: SparkContext, - modelId: String, - algoParams: Params, - bm: Any): Any = Unit - - /** :: DeveloperApi :: - * Obtains the type signature of query for this algorithm - * - * @return Type signature of query - */ - def queryClass: Class[Q] = { - val types = TypeResolver.resolveRawArguments(classOf[BaseAlgorithm[PD, M, Q, P]], getClass) - types(2).asInstanceOf[Class[Q]] - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/core/BaseDataSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/core/BaseDataSource.scala b/core/src/main/scala/io/prediction/core/BaseDataSource.scala deleted file mode 100644 index dd1157d..0000000 --- a/core/src/main/scala/io/prediction/core/BaseDataSource.scala +++ /dev/null @@ -1,52 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.core - -import io.prediction.annotation.DeveloperApi -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD - -/** :: DeveloperApi :: - * Base class of all data source controllers - * - * @tparam TD Training data class - * @tparam EI Evaluation information class - * @tparam Q Query class - * @tparam A Actual result class - */ -@DeveloperApi -abstract class BaseDataSource[TD, EI, Q, A] extends AbstractDoer { - /** :: DeveloperApi :: - * Engine developer should not use this directly. This is called by workflow - * to read training data. - * - * @param sc Spark context - * @return Training data - */ - @DeveloperApi - def readTrainingBase(sc: SparkContext): TD - - /** :: DeveloperApi :: - * Engine developer should not use this directly. This is called by - * evaluation workflow to read training and validation data. - * - * @param sc Spark context - * @return Sets of training data, evaluation information, queries, and actual - * results - */ - @DeveloperApi - def readEvalBase(sc: SparkContext): Seq[(TD, EI, RDD[(Q, A)])] -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/core/BaseEngine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/core/BaseEngine.scala b/core/src/main/scala/io/prediction/core/BaseEngine.scala deleted file mode 100644 index 5356fa7..0000000 --- a/core/src/main/scala/io/prediction/core/BaseEngine.scala +++ /dev/null @@ -1,100 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.core - -import io.prediction.annotation.DeveloperApi -import io.prediction.controller.EngineParams -import io.prediction.workflow.JsonExtractorOption.JsonExtractorOption -import io.prediction.workflow.WorkflowParams -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD -import org.json4s.JValue - -/** :: DeveloperApi :: - * Base class of all engine controller classes - * - * @tparam EI Evaluation information class - * @tparam Q Query class - * @tparam P Predicted result class - * @tparam A Actual result class - */ -@DeveloperApi -abstract class BaseEngine[EI, Q, P, A] extends Serializable { - /** :: DeveloperApi :: - * Implement this method so that training this engine would return a list of - * models. - * - * @param sc An instance of SparkContext. - * @param engineParams An instance of [[EngineParams]] for running a single training. - * @param params An instance of [[WorkflowParams]] that controls the workflow. - * @return A list of models. - */ - @DeveloperApi - def train( - sc: SparkContext, - engineParams: EngineParams, - engineInstanceId: String, - params: WorkflowParams): Seq[Any] - - /** :: DeveloperApi :: - * Implement this method so that [[io.prediction.controller.Evaluation]] can - * use this method to generate inputs for [[io.prediction.controller.Metric]]. - * - * @param sc An instance of SparkContext. - * @param engineParams An instance of [[EngineParams]] for running a single evaluation. - * @param params An instance of [[WorkflowParams]] that controls the workflow. - * @return A list of evaluation information and RDD of query, predicted - * result, and actual result tuple tuple. - */ - @DeveloperApi - def eval( - sc: SparkContext, - engineParams: EngineParams, - params: WorkflowParams): Seq[(EI, RDD[(Q, P, A)])] - - /** :: DeveloperApi :: - * Override this method to further optimize the process that runs multiple - * evaluations (during tuning, for example). By default, this method calls - * [[eval]] for each element in the engine parameters list. - * - * @param sc An instance of SparkContext. - * @param engineParamsList A list of [[EngineParams]] for running batch evaluation. - * @param params An instance of [[WorkflowParams]] that controls the workflow. - * @return A list of engine parameters and evaluation result (from [[eval]]) tuples. - */ - @DeveloperApi - def batchEval( - sc: SparkContext, - engineParamsList: Seq[EngineParams], - params: WorkflowParams) - : Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])] = { - engineParamsList.map { engineParams => - (engineParams, eval(sc, engineParams, params)) - } - } - - /** :: DeveloperApi :: - * Implement this method to convert a JValue (read from an engine variant - * JSON file) to an instance of [[EngineParams]]. - * - * @param variantJson Content of the engine variant JSON as JValue. - * @param jsonExtractor Content of the engine variant JSON as JValue. - * @return An instance of [[EngineParams]] converted from JSON. - */ - @DeveloperApi - def jValueToEngineParams(variantJson: JValue, jsonExtractor: JsonExtractorOption): EngineParams = - throw new NotImplementedError("JSON to EngineParams is not implemented.") -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/core/BaseEvaluator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/core/BaseEvaluator.scala b/core/src/main/scala/io/prediction/core/BaseEvaluator.scala deleted file mode 100644 index 23fe826..0000000 --- a/core/src/main/scala/io/prediction/core/BaseEvaluator.scala +++ /dev/null @@ -1,72 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.core - -import io.prediction.annotation.DeveloperApi -import io.prediction.annotation.Experimental -import io.prediction.controller.EngineParams -import io.prediction.controller.Evaluation -import io.prediction.workflow.WorkflowParams -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD - -/** :: DeveloperApi :: - * Base class of all evaluator controller classes - * - * @tparam EI Evaluation information class - * @tparam Q Query class - * @tparam P Predicted result class - * @tparam A Actual result class - * @tparam ER Evaluation result class - */ -@DeveloperApi -abstract class BaseEvaluator[EI, Q, P, A, ER <: BaseEvaluatorResult] - extends AbstractDoer { - /** :: DeveloperApi :: - * Engine developers should not use this directly. This is called by - * evaluation workflow to perform evaluation. - * - * @param sc Spark context - * @param evaluation Evaluation to run - * @param engineEvalDataSet Sets of engine parameters and data for evaluation - * @param params Evaluation workflow parameters - * @return Evaluation result - */ - @DeveloperApi - def evaluateBase( - sc: SparkContext, - evaluation: Evaluation, - engineEvalDataSet: Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])], - params: WorkflowParams): ER -} - -/** Base trait of evaluator result */ -trait BaseEvaluatorResult extends Serializable { - /** A short description of the result */ - def toOneLiner(): String = "" - - /** HTML portion of the rendered evaluator results */ - def toHTML(): String = "" - - /** JSON portion of the rendered evaluator results */ - def toJSON(): String = "" - - /** :: Experimental :: - * Indicate if this result is inserted into database - */ - @Experimental - val noSave: Boolean = false -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/core/BasePreparator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/core/BasePreparator.scala b/core/src/main/scala/io/prediction/core/BasePreparator.scala deleted file mode 100644 index d6d0e45..0000000 --- a/core/src/main/scala/io/prediction/core/BasePreparator.scala +++ /dev/null @@ -1,42 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.core - -import io.prediction.annotation.DeveloperApi -import org.apache.spark.SparkContext - -/** :: DeveloperApi :: - * Base class of all preparator controller classes - * - * Dev note: Probably will add an extra parameter for ad hoc JSON formatter - * - * @tparam TD Training data class - * @tparam PD Prepared data class - */ -@DeveloperApi -abstract class BasePreparator[TD, PD] - extends AbstractDoer { - /** :: DeveloperApi :: - * Engine developers should not use this directly. This is called by training - * workflow to prepare data before handing it over to algorithm - * - * @param sc Spark context - * @param td Training data - * @return Prepared data - */ - @DeveloperApi - def prepareBase(sc: SparkContext, td: TD): PD -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/core/BaseServing.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/core/BaseServing.scala b/core/src/main/scala/io/prediction/core/BaseServing.scala deleted file mode 100644 index d8bde9e..0000000 --- a/core/src/main/scala/io/prediction/core/BaseServing.scala +++ /dev/null @@ -1,51 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.core - -import io.prediction.annotation.DeveloperApi -import io.prediction.annotation.Experimental - -/** :: DeveloperApi :: - * Base class of all serving controller classes - * - * @tparam Q Query class - * @tparam P Predicted result class - */ -@DeveloperApi -abstract class BaseServing[Q, P] - extends AbstractDoer { - /** :: Experimental :: - * Engine developers should not use this directly. This is called by serving - * layer to supplement process the query before sending it to algorithms. - * - * @param q Query - * @return A supplement Query - */ - @Experimental - def supplementBase(q: Q): Q - - /** :: DeveloperApi :: - * Engine developers should not use this directly. This is called by serving - * layer to combine multiple predicted results from multiple algorithms, and - * custom business logic before serving to the end user. - * - * @param q Query - * @param ps List of predicted results - * @return A single predicted result - */ - @DeveloperApi - def serveBase(q: Q, ps: Seq[P]): P -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/core/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/core/package.scala b/core/src/main/scala/io/prediction/core/package.scala deleted file mode 100644 index c7586c7..0000000 --- a/core/src/main/scala/io/prediction/core/package.scala +++ /dev/null @@ -1,21 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction - -/** Core base classes of PredictionIO controller components. Engine developers - * should not use these directly. - */ -package object core {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/package.scala b/core/src/main/scala/io/prediction/package.scala deleted file mode 100644 index 3e3cc80..0000000 --- a/core/src/main/scala/io/prediction/package.scala +++ /dev/null @@ -1,19 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io - -/** PredictionIO Scala API */ -package object prediction {} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/CoreWorkflow.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/workflow/CoreWorkflow.scala b/core/src/main/scala/io/prediction/workflow/CoreWorkflow.scala deleted file mode 100644 index ad93b1a..0000000 --- a/core/src/main/scala/io/prediction/workflow/CoreWorkflow.scala +++ /dev/null @@ -1,163 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.workflow - -import io.prediction.controller.EngineParams -import io.prediction.controller.Evaluation -import io.prediction.core.BaseEngine -import io.prediction.core.BaseEvaluator -import io.prediction.core.BaseEvaluatorResult -import io.prediction.data.storage.EngineInstance -import io.prediction.data.storage.EvaluationInstance -import io.prediction.data.storage.Model -import io.prediction.data.storage.Storage - -import com.github.nscala_time.time.Imports.DateTime -import grizzled.slf4j.Logger - -import scala.language.existentials - -/** CoreWorkflow handles PredictionIO metadata and environment variables of - * training and evaluation. - */ -object CoreWorkflow { - @transient lazy val logger = Logger[this.type] - @transient lazy val engineInstances = Storage.getMetaDataEngineInstances - @transient lazy val evaluationInstances = - Storage.getMetaDataEvaluationInstances() - - def runTrain[EI, Q, P, A]( - engine: BaseEngine[EI, Q, P, A], - engineParams: EngineParams, - engineInstance: EngineInstance, - env: Map[String, String] = WorkflowUtils.pioEnvVars, - params: WorkflowParams = WorkflowParams()) { - logger.debug("Starting SparkContext") - val mode = "training" - WorkflowUtils.checkUpgrade(mode, engineInstance.engineFactory) - - val batch = if (params.batch.nonEmpty) { - s"{engineInstance.engineFactory} (${params.batch}})" - } else { - engineInstance.engineFactory - } - val sc = WorkflowContext( - batch, - env, - params.sparkEnv, - mode.capitalize) - - try { - - val models: Seq[Any] = engine.train( - sc = sc, - engineParams = engineParams, - engineInstanceId = engineInstance.id, - params = params - ) - - val instanceId = Storage.getMetaDataEngineInstances - - val kryo = KryoInstantiator.newKryoInjection - - logger.info("Inserting persistent model") - Storage.getModelDataModels.insert(Model( - id = engineInstance.id, - models = kryo(models))) - - logger.info("Updating engine instance") - val engineInstances = Storage.getMetaDataEngineInstances - engineInstances.update(engineInstance.copy( - status = "COMPLETED", - endTime = DateTime.now - )) - - logger.info("Training completed successfully.") - } catch { - case e @( - _: StopAfterReadInterruption | - _: StopAfterPrepareInterruption) => { - logger.info(s"Training interrupted by $e.") - } - } finally { - logger.debug("Stopping SparkContext") - sc.stop() - } - } - - def runEvaluation[EI, Q, P, A, R <: BaseEvaluatorResult]( - evaluation: Evaluation, - engine: BaseEngine[EI, Q, P, A], - engineParamsList: Seq[EngineParams], - evaluationInstance: EvaluationInstance, - evaluator: BaseEvaluator[EI, Q, P, A, R], - env: Map[String, String] = WorkflowUtils.pioEnvVars, - params: WorkflowParams = WorkflowParams()) { - logger.info("runEvaluation started") - logger.debug("Start SparkContext") - - val mode = "evaluation" - - WorkflowUtils.checkUpgrade(mode, engine.getClass.getName) - - val batch = if (params.batch.nonEmpty) { - s"{evaluation.getClass.getName} (${params.batch}})" - } else { - evaluation.getClass.getName - } - val sc = WorkflowContext( - batch, - env, - params.sparkEnv, - mode.capitalize) - val evaluationInstanceId = evaluationInstances.insert(evaluationInstance) - - logger.info(s"Starting evaluation instance ID: $evaluationInstanceId") - - val evaluatorResult: BaseEvaluatorResult = EvaluationWorkflow.runEvaluation( - sc, - evaluation, - engine, - engineParamsList, - evaluator, - params) - - if (evaluatorResult.noSave) { - logger.info(s"This evaluation result is not inserted into database: $evaluatorResult") - } else { - val evaluatedEvaluationInstance = evaluationInstance.copy( - status = "EVALCOMPLETED", - id = evaluationInstanceId, - endTime = DateTime.now, - evaluatorResults = evaluatorResult.toOneLiner, - evaluatorResultsHTML = evaluatorResult.toHTML, - evaluatorResultsJSON = evaluatorResult.toJSON - ) - - logger.info(s"Updating evaluation instance with result: $evaluatorResult") - - evaluationInstances.update(evaluatedEvaluationInstance) - } - - logger.debug("Stop SparkContext") - - sc.stop() - - logger.info("runEvaluation completed") - } -} - - http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/CreateServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/prediction/workflow/CreateServer.scala b/core/src/main/scala/io/prediction/workflow/CreateServer.scala deleted file mode 100644 index a664187..0000000 --- a/core/src/main/scala/io/prediction/workflow/CreateServer.scala +++ /dev/null @@ -1,737 +0,0 @@ -/** Copyright 2015 TappingStone, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.prediction.workflow - -import java.io.PrintWriter -import java.io.Serializable -import java.io.StringWriter -import java.util.concurrent.TimeUnit - -import akka.actor._ -import akka.event.Logging -import akka.io.IO -import akka.pattern.ask -import akka.util.Timeout -import com.github.nscala_time.time.Imports.DateTime -import com.twitter.bijection.Injection -import com.twitter.chill.KryoBase -import com.twitter.chill.KryoInjection -import com.twitter.chill.ScalaKryoInstantiator -import com.typesafe.config.ConfigFactory -import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer -import grizzled.slf4j.Logging -import io.prediction.authentication.KeyAuthentication -import io.prediction.configuration.SSLConfiguration -import io.prediction.controller.Engine -import io.prediction.controller.Params -import io.prediction.controller.Utils -import io.prediction.controller.WithPrId -import io.prediction.core.BaseAlgorithm -import io.prediction.core.BaseServing -import io.prediction.core.Doer -import io.prediction.data.storage.EngineInstance -import io.prediction.data.storage.EngineManifest -import io.prediction.data.storage.Storage -import io.prediction.workflow.JsonExtractorOption.JsonExtractorOption -import org.json4s._ -import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization.write -import spray.can.Http -import spray.can.server.ServerSettings -import spray.http.MediaTypes._ -import spray.http._ -import spray.httpx.Json4sSupport -import spray.routing._ -import spray.routing.authentication.{UserPass, BasicAuth} - -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.concurrent.future -import scala.language.existentials -import scala.util.Failure -import scala.util.Random -import scala.util.Success -import scalaj.http.HttpOptions - -class KryoInstantiator(classLoader: ClassLoader) extends ScalaKryoInstantiator { - override def newKryo(): KryoBase = { - val kryo = super.newKryo() - kryo.setClassLoader(classLoader) - SynchronizedCollectionsSerializer.registerSerializers(kryo) - kryo - } -} - -object KryoInstantiator extends Serializable { - def newKryoInjection : Injection[Any, Array[Byte]] = { - val kryoInstantiator = new KryoInstantiator(getClass.getClassLoader) - KryoInjection.instance(kryoInstantiator) - } -} - -case class ServerConfig( - batch: String = "", - engineInstanceId: String = "", - engineId: Option[String] = None, - engineVersion: Option[String] = None, - engineVariant: String = "", - env: Option[String] = None, - ip: String = "0.0.0.0", - port: Int = 8000, - feedback: Boolean = false, - eventServerIp: String = "0.0.0.0", - eventServerPort: Int = 7070, - accessKey: Option[String] = None, - logUrl: Option[String] = None, - logPrefix: Option[String] = None, - logFile: Option[String] = None, - verbose: Boolean = false, - debug: Boolean = false, - jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both) - -case class StartServer() -case class BindServer() -case class StopServer() -case class ReloadServer() -case class UpgradeCheck() - - -object CreateServer extends Logging { - val actorSystem = ActorSystem("pio-server") - val engineInstances = Storage.getMetaDataEngineInstances - val engineManifests = Storage.getMetaDataEngineManifests - val modeldata = Storage.getModelDataModels - - def main(args: Array[String]): Unit = { - val parser = new scopt.OptionParser[ServerConfig]("CreateServer") { - opt[String]("batch") action { (x, c) => - c.copy(batch = x) - } text("Batch label of the deployment.") - opt[String]("engineId") action { (x, c) => - c.copy(engineId = Some(x)) - } text("Engine ID.") - opt[String]("engineVersion") action { (x, c) => - c.copy(engineVersion = Some(x)) - } text("Engine version.") - opt[String]("engine-variant") required() action { (x, c) => - c.copy(engineVariant = x) - } text("Engine variant JSON.") - opt[String]("ip") action { (x, c) => - c.copy(ip = x) - } - opt[String]("env") action { (x, c) => - c.copy(env = Some(x)) - } text("Comma-separated list of environmental variables (in 'FOO=BAR' " + - "format) to pass to the Spark execution environment.") - opt[Int]("port") action { (x, c) => - c.copy(port = x) - } text("Port to bind to (default: 8000).") - opt[String]("engineInstanceId") required() action { (x, c) => - c.copy(engineInstanceId = x) - } text("Engine instance ID.") - opt[Unit]("feedback") action { (_, c) => - c.copy(feedback = true) - } text("Enable feedback loop to event server.") - opt[String]("event-server-ip") action { (x, c) => - c.copy(eventServerIp = x) - } - opt[Int]("event-server-port") action { (x, c) => - c.copy(eventServerPort = x) - } text("Event server port. Default: 7070") - opt[String]("accesskey") action { (x, c) => - c.copy(accessKey = Some(x)) - } text("Event server access key.") - opt[String]("log-url") action { (x, c) => - c.copy(logUrl = Some(x)) - } - opt[String]("log-prefix") action { (x, c) => - c.copy(logPrefix = Some(x)) - } - opt[String]("log-file") action { (x, c) => - c.copy(logFile = Some(x)) - } - opt[Unit]("verbose") action { (x, c) => - c.copy(verbose = true) - } text("Enable verbose output.") - opt[Unit]("debug") action { (x, c) => - c.copy(debug = true) - } text("Enable debug output.") - opt[String]("json-extractor") action { (x, c) => - c.copy(jsonExtractor = JsonExtractorOption.withName(x)) - } - } - - parser.parse(args, ServerConfig()) map { sc => - WorkflowUtils.modifyLogging(sc.verbose) - engineInstances.get(sc.engineInstanceId) map { engineInstance => - val engineId = sc.engineId.getOrElse(engineInstance.engineId) - val engineVersion = sc.engineVersion.getOrElse( - engineInstance.engineVersion) - engineManifests.get(engineId, engineVersion) map { manifest => - val engineFactoryName = engineInstance.engineFactory - val upgrade = actorSystem.actorOf(Props( - classOf[UpgradeActor], - engineFactoryName)) - actorSystem.scheduler.schedule( - 0.seconds, - 1.days, - upgrade, - UpgradeCheck()) - val master = actorSystem.actorOf(Props( - classOf[MasterActor], - sc, - engineInstance, - engineFactoryName, - manifest), - "master") - implicit val timeout = Timeout(5.seconds) - master ? StartServer() - actorSystem.awaitTermination - } getOrElse { - error(s"Invalid engine ID or version. Aborting server.") - } - } getOrElse { - error(s"Invalid engine instance ID. Aborting server.") - } - } - } - - def createServerActorWithEngine[TD, EIN, PD, Q, P, A]( - sc: ServerConfig, - engineInstance: EngineInstance, - engine: Engine[TD, EIN, PD, Q, P, A], - engineLanguage: EngineLanguage.Value, - manifest: EngineManifest): ActorRef = { - - val engineParams = engine.engineInstanceToEngineParams(engineInstance, sc.jsonExtractor) - - val kryo = KryoInstantiator.newKryoInjection - - val modelsFromEngineInstance = - kryo.invert(modeldata.get(engineInstance.id).get.models).get. - asInstanceOf[Seq[Any]] - - val batch = if (engineInstance.batch.nonEmpty) { - s"${engineInstance.engineFactory} (${engineInstance.batch})" - } else { - engineInstance.engineFactory - } - - val sparkContext = WorkflowContext( - batch = batch, - executorEnv = engineInstance.env, - mode = "Serving", - sparkEnv = engineInstance.sparkConf) - - val models = engine.prepareDeploy( - sparkContext, - engineParams, - engineInstance.id, - modelsFromEngineInstance, - params = WorkflowParams() - ) - - val algorithms = engineParams.algorithmParamsList.map { case (n, p) => - Doer(engine.algorithmClassMap(n), p) - } - - val servingParamsWithName = engineParams.servingParams - - val serving = Doer(engine.servingClassMap(servingParamsWithName._1), - servingParamsWithName._2) - - actorSystem.actorOf( - Props( - classOf[ServerActor[Q, P]], - sc, - engineInstance, - engine, - engineLanguage, - manifest, - engineParams.dataSourceParams._2, - engineParams.preparatorParams._2, - algorithms, - engineParams.algorithmParamsList.map(_._2), - models, - serving, - engineParams.servingParams._2)) - } -} - -class UpgradeActor(engineClass: String) extends Actor { - val log = Logging(context.system, this) - implicit val system = context.system - def receive: Actor.Receive = { - case x: UpgradeCheck => - WorkflowUtils.checkUpgrade("deployment", engineClass) - } -} - -class MasterActor ( - sc: ServerConfig, - engineInstance: EngineInstance, - engineFactoryName: String, - manifest: EngineManifest) extends Actor with SSLConfiguration with KeyAuthentication { - val log = Logging(context.system, this) - implicit val system = context.system - var sprayHttpListener: Option[ActorRef] = None - var currentServerActor: Option[ActorRef] = None - var retry = 3 - - def undeploy(ip: String, port: Int): Unit = { - val serverUrl = s"https://${ip}:${port}" - log.info( - s"Undeploying any existing engine instance at $serverUrl") - try { - val code = scalaj.http.Http(s"$serverUrl/stop") - .option(HttpOptions.allowUnsafeSSL) - .param(ServerKey.param, ServerKey.get) - .method("POST").asString.code - code match { - case 200 => Unit - case 404 => log.error( - s"Another process is using $serverUrl. Unable to undeploy.") - case _ => log.error( - s"Another process is using $serverUrl, or an existing " + - s"engine server is not responding properly (HTTP $code). " + - "Unable to undeploy.") - } - } catch { - case e: java.net.ConnectException => - log.warning(s"Nothing at $serverUrl") - case _: Throwable => - log.error("Another process might be occupying " + - s"$ip:$port. Unable to undeploy.") - } - } - - def receive: Actor.Receive = { - case x: StartServer => - val actor = createServerActor( - sc, - engineInstance, - engineFactoryName, - manifest) - currentServerActor = Some(actor) - undeploy(sc.ip, sc.port) - self ! BindServer() - case x: BindServer => - currentServerActor map { actor => - val settings = ServerSettings(system) - IO(Http) ! Http.Bind( - actor, - interface = sc.ip, - port = sc.port, - settings = Some(settings.copy(sslEncryption = true))) - } getOrElse { - log.error("Cannot bind a non-existing server backend.") - } - case x: StopServer => - log.info(s"Stop server command received.") - sprayHttpListener.map { l => - log.info("Server is shutting down.") - l ! Http.Unbind(5.seconds) - system.shutdown - } getOrElse { - log.warning("No active server is running.") - } - case x: ReloadServer => - log.info("Reload server command received.") - val latestEngineInstance = - CreateServer.engineInstances.getLatestCompleted( - manifest.id, - manifest.version, - engineInstance.engineVariant) - latestEngineInstance map { lr => - val actor = createServerActor(sc, lr, engineFactoryName, manifest) - sprayHttpListener.map { l => - l ! Http.Unbind(5.seconds) - val settings = ServerSettings(system) - IO(Http) ! Http.Bind( - actor, - interface = sc.ip, - port = sc.port, - settings = Some(settings.copy(sslEncryption = true))) - currentServerActor.get ! Kill - currentServerActor = Some(actor) - } getOrElse { - log.warning("No active server is running. Abort reloading.") - } - } getOrElse { - log.warning( - s"No latest completed engine instance for ${manifest.id} " + - s"${manifest.version}. Abort reloading.") - } - case x: Http.Bound => - val serverUrl = s"https://${sc.ip}:${sc.port}" - log.info(s"Engine is deployed and running. Engine API is live at ${serverUrl}.") - sprayHttpListener = Some(sender) - case x: Http.CommandFailed => - if (retry > 0) { - retry -= 1 - log.error(s"Bind failed. Retrying... ($retry more trial(s))") - context.system.scheduler.scheduleOnce(1.seconds) { - self ! BindServer() - } - } else { - log.error("Bind failed. Shutting down.") - system.shutdown - } - } - - def createServerActor( - sc: ServerConfig, - engineInstance: EngineInstance, - engineFactoryName: String, - manifest: EngineManifest): ActorRef = { - val (engineLanguage, engineFactory) = - WorkflowUtils.getEngine(engineFactoryName, getClass.getClassLoader) - val engine = engineFactory() - - // EngineFactory return a base engine, which may not be deployable. - if (!engine.isInstanceOf[Engine[_,_,_,_,_,_]]) { - throw new NoSuchMethodException(s"Engine $engine is not deployable") - } - - val deployableEngine = engine.asInstanceOf[Engine[_,_,_,_,_,_]] - - CreateServer.createServerActorWithEngine( - sc, - engineInstance, - // engine, - deployableEngine, - engineLanguage, - manifest) - } -} - -class ServerActor[Q, P]( - val args: ServerConfig, - val engineInstance: EngineInstance, - val engine: Engine[_, _, _, Q, P, _], - val engineLanguage: EngineLanguage.Value, - val manifest: EngineManifest, - val dataSourceParams: Params, - val preparatorParams: Params, - val algorithms: Seq[BaseAlgorithm[_, _, Q, P]], - val algorithmsParams: Seq[Params], - val models: Seq[Any], - val serving: BaseServing[Q, P], - val servingParams: Params) extends Actor with HttpService with KeyAuthentication { - val serverStartTime = DateTime.now - val log = Logging(context.system, this) - - var requestCount: Int = 0 - var avgServingSec: Double = 0.0 - var lastServingSec: Double = 0.0 - - /** The following is required by HttpService */ - def actorRefFactory: ActorContext = context - - implicit val timeout = Timeout(5, TimeUnit.SECONDS) - val pluginsActorRef = - context.actorOf(Props(classOf[PluginsActor], args.engineVariant), "PluginsActor") - val pluginContext = EngineServerPluginContext(log, args.engineVariant) - - def receive: Actor.Receive = runRoute(myRoute) - - val feedbackEnabled = if (args.feedback) { - if (args.accessKey.isEmpty) { - log.error("Feedback loop cannot be enabled because accessKey is empty.") - false - } else { - true - } - } else false - - def remoteLog(logUrl: String, logPrefix: String, message: String): Unit = { - implicit val formats = Utils.json4sDefaultFormats - try { - scalaj.http.Http(logUrl).postData( - logPrefix + write(Map( - "engineInstance" -> engineInstance, - "message" -> message))).asString - } catch { - case e: Throwable => - log.error(s"Unable to send remote log: ${e.getMessage}") - } - } - - def getStackTraceString(e: Throwable): String = { - val writer = new StringWriter() - val printWriter = new PrintWriter(writer) - e.printStackTrace(printWriter) - writer.toString - } - - val myRoute = - path("") { - get { - respondWithMediaType(`text/html`) { - detach() { - complete { - html.index( - args, - manifest, - engineInstance, - algorithms.map(_.toString), - algorithmsParams.map(_.toString), - models.map(_.toString), - dataSourceParams.toString, - preparatorParams.toString, - servingParams.toString, - serverStartTime, - feedbackEnabled, - args.eventServerIp, - args.eventServerPort, - requestCount, - avgServingSec, - lastServingSec - ).toString - } - } - } - } - } ~ - path("queries.json") { - post { - detach() { - entity(as[String]) { queryString => - try { - val servingStartTime = DateTime.now - val jsonExtractorOption = args.jsonExtractor - val queryTime = DateTime.now - // Extract Query from Json - val query = JsonExtractor.extract( - jsonExtractorOption, - queryString, - algorithms.head.queryClass, - algorithms.head.querySerializer, - algorithms.head.gsonTypeAdapterFactories - ) - val queryJValue = JsonExtractor.toJValue( - jsonExtractorOption, - query, - algorithms.head.querySerializer, - algorithms.head.gsonTypeAdapterFactories) - // Deploy logic. First call Serving.supplement, then Algo.predict, - // finally Serving.serve. - val supplementedQuery = serving.supplementBase(query) - // TODO: Parallelize the following. - val predictions = algorithms.zipWithIndex.map { case (a, ai) => - a.predictBase(models(ai), supplementedQuery) - } - // Notice that it is by design to call Serving.serve with the - // *original* query. - val prediction = serving.serveBase(query, predictions) - val predictionJValue = JsonExtractor.toJValue( - jsonExtractorOption, - prediction, - algorithms.head.querySerializer, - algorithms.head.gsonTypeAdapterFactories) - /** Handle feedback to Event Server - * Send the following back to the Event Server - * - appId - * - engineInstanceId - * - query - * - prediction - * - prId - */ - val result = if (feedbackEnabled) { - implicit val formats = - algorithms.headOption map { alg => - alg.querySerializer - } getOrElse { - Utils.json4sDefaultFormats - } - // val genPrId = Random.alphanumeric.take(64).mkString - def genPrId: String = Random.alphanumeric.take(64).mkString - val newPrId = prediction match { - case id: WithPrId => - val org = id.prId - if (org.isEmpty) genPrId else org - case _ => genPrId - } - - // also save Query's prId as prId of this pio_pr predict events - val queryPrId = - query match { - case id: WithPrId => - Map("prId" -> id.prId) - case _ => - Map() - } - val data = Map( - // "appId" -> dataSourceParams.asInstanceOf[ParamsWithAppId].appId, - "event" -> "predict", - "eventTime" -> queryTime.toString(), - "entityType" -> "pio_pr", // prediction result - "entityId" -> newPrId, - "properties" -> Map( - "engineInstanceId" -> engineInstance.id, - "query" -> query, - "prediction" -> prediction)) ++ queryPrId - // At this point args.accessKey should be Some(String). - val accessKey = args.accessKey.getOrElse("") - val f: Future[Int] = future { - scalaj.http.Http( - s"http://${args.eventServerIp}:${args.eventServerPort}/" + - s"events.json?accessKey=$accessKey").postData( - write(data)).header( - "content-type", "application/json").asString.code - } - f onComplete { - case Success(code) => { - if (code != 201) { - log.error(s"Feedback event failed. Status code: $code." - + s"Data: ${write(data)}.") - } - } - case Failure(t) => { - log.error(s"Feedback event failed: ${t.getMessage}") } - } - // overwrite prId in predictedResult - // - if it is WithPrId, - // then overwrite with new prId - // - if it is not WithPrId, no prId injection - if (prediction.isInstanceOf[WithPrId]) { - predictionJValue merge parse(s"""{"prId" : "$newPrId"}""") - } else { - predictionJValue - } - } else predictionJValue - - val pluginResult = - pluginContext.outputBlockers.values.foldLeft(result) { case (r, p) => - p.process(engineInstance, queryJValue, r, pluginContext) - } - - // Bookkeeping - val servingEndTime = DateTime.now - lastServingSec = - (servingEndTime.getMillis - servingStartTime.getMillis) / 1000.0 - avgServingSec = - ((avgServingSec * requestCount) + lastServingSec) / - (requestCount + 1) - requestCount += 1 - - respondWithMediaType(`application/json`) { - complete(compact(render(pluginResult))) - } - } catch { - case e: MappingException => - log.error( - s"Query '$queryString' is invalid. Reason: ${e.getMessage}") - args.logUrl map { url => - remoteLog( - url, - args.logPrefix.getOrElse(""), - s"Query:\n$queryString\n\nStack Trace:\n" + - s"${getStackTraceString(e)}\n\n") - } - complete(StatusCodes.BadRequest, e.getMessage) - case e: Throwable => - val msg = s"Query:\n$queryString\n\nStack Trace:\n" + - s"${getStackTraceString(e)}\n\n" - log.error(msg) - args.logUrl map { url => - remoteLog( - url, - args.logPrefix.getOrElse(""), - msg) - } - complete(StatusCodes.InternalServerError, msg) - } - } - } - } - } ~ - path("reload") { - authenticate(withAccessKeyFromFile) { request => - post { - complete { - context.actorSelection("/user/master") ! ReloadServer() - "Reloading..." - } - } - } - } ~ - path("stop") { - authenticate(withAccessKeyFromFile) { request => - post { - complete { - context.system.scheduler.scheduleOnce(1.seconds) { - context.actorSelection("/user/master") ! StopServer() - } - "Shutting down..." - } - } - } - } ~ - pathPrefix("assets") { - getFromResourceDirectory("assets") - } ~ - path("plugins.json") { - import EngineServerJson4sSupport._ - get { - respondWithMediaType(MediaTypes.`application/json`) { - complete { - Map("plugins" -> Map( - "outputblockers" -> pluginContext.outputBlockers.map { case (n, p) => - n -> Map( - "name" -> p.pluginName, - "description" -> p.pluginDescription, - "class" -> p.getClass.getName, - "params" -> pluginContext.pluginParams(p.pluginName)) - }, - "outputsniffers" -> pluginContext.outputSniffers.map { case (n, p) => - n -> Map( - "name" -> p.pluginName, - "description" -> p.pluginDescription, - "class" -> p.getClass.getName, - "params" -> pluginContext.pluginParams(p.pluginName)) - } - )) - } - } - } - } ~ - path("plugins" / Segments) { segments => - import EngineServerJson4sSupport._ - get { - respondWithMediaType(MediaTypes.`application/json`) { - complete { - val pluginArgs = segments.drop(2) - val pluginType = segments(0) - val pluginName = segments(1) - pluginType match { - case EngineServerPlugin.outputSniffer => - pluginsActorRef ? PluginsActor.HandleREST( - pluginName = pluginName, - pluginArgs = pluginArgs) map { - _.asInstanceOf[String] - } - } - } - } - } - } -} - -object EngineServerJson4sSupport extends Json4sSupport { - implicit def json4sFormats: Formats = DefaultFormats -}
