http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/controller/Utils.scala 
b/core/src/main/scala/io/prediction/controller/Utils.scala
deleted file mode 100644
index 5098fba..0000000
--- a/core/src/main/scala/io/prediction/controller/Utils.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller
-
-import io.prediction.workflow.KryoInstantiator
-
-import org.json4s._
-import org.json4s.ext.JodaTimeSerializers
-
-import scala.io.Source
-
-import _root_.java.io.File
-import _root_.java.io.FileOutputStream
-
-/** Controller utilities.
-  *
-  * @group Helper
-  */
-object Utils {
-  /** Default JSON4S serializers for PredictionIO controllers. */
-  val json4sDefaultFormats = DefaultFormats.lossless ++ JodaTimeSerializers.all
-
-  /** Save a model object as a file to a temporary location on local 
filesystem.
-    * It will first try to use the location indicated by the environmental
-    * variable PIO_FS_TMPDIR, then fall back to the java.io.tmpdir property.
-    *
-    * @param id Used as the filename of the file.
-    * @param model Model object.
-    */
-  def save(id: String, model: Any): Unit = {
-    val tmpdir = sys.env.getOrElse("PIO_FS_TMPDIR", 
System.getProperty("java.io.tmpdir"))
-    val modelFile = tmpdir + File.separator + id
-    (new File(tmpdir)).mkdirs
-    val fos = new FileOutputStream(modelFile)
-    val kryo = KryoInstantiator.newKryoInjection
-    fos.write(kryo(model))
-    fos.close
-  }
-
-  /** Load a model object from a file in a temporary location on local
-    * filesystem. It will first try to use the location indicated by the
-    * environmental variable PIO_FS_TMPDIR, then fall back to the 
java.io.tmpdir
-    * property.
-    *
-    * @param id Used as the filename of the file.
-    */
-  def load(id: String): Any = {
-    val tmpdir = sys.env.getOrElse("PIO_FS_TMPDIR", 
System.getProperty("java.io.tmpdir"))
-    val modelFile = tmpdir + File.separator + id
-    val src = Source.fromFile(modelFile)(scala.io.Codec.ISO8859)
-    val kryo = KryoInstantiator.newKryoInjection
-    val m = kryo.invert(src.map(_.toByte).toArray).get
-    src.close
-    m
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/JavaEngineParamsGenerator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/io/prediction/controller/java/JavaEngineParamsGenerator.scala
 
b/core/src/main/scala/io/prediction/controller/java/JavaEngineParamsGenerator.scala
deleted file mode 100644
index f932012..0000000
--- 
a/core/src/main/scala/io/prediction/controller/java/JavaEngineParamsGenerator.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller.java
-
-import io.prediction.controller.EngineParams
-import io.prediction.controller.EngineParamsGenerator
-
-import scala.collection.JavaConversions.asScalaBuffer
-
-/** Define an engine parameter generator in Java
-  *
-  * Implementations of this abstract class can be supplied to "pio eval" as 
the second
-  * command line argument.
-  *
-  * @group Evaluation
-  */
-abstract class JavaEngineParamsGenerator extends EngineParamsGenerator {
-
-  /** Set the list of [[EngineParams]].
-    *
-    * @param engineParams A list of engine params
-    */
-  def setEngineParamsList(engineParams: java.util.List[_ <: EngineParams]) {
-    engineParamsList = asScalaBuffer(engineParams)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/JavaEvaluation.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/io/prediction/controller/java/JavaEvaluation.scala 
b/core/src/main/scala/io/prediction/controller/java/JavaEvaluation.scala
deleted file mode 100644
index 3db89bf..0000000
--- a/core/src/main/scala/io/prediction/controller/java/JavaEvaluation.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller.java
-
-import io.prediction.controller.Evaluation
-import io.prediction.controller.Metric
-import io.prediction.core.BaseEngine
-
-import scala.collection.JavaConversions.asScalaBuffer
-
-/** Define an evaluation in Java.
-  *
-  * Implementations of this abstract class can be supplied to "pio eval" as 
the first
-  * argument.
-  *
-  * @group Evaluation
-  */
-
-abstract class JavaEvaluation extends Evaluation {
-  /** Set the [[BaseEngine]] and [[Metric]] for this [[Evaluation]]
-    *
-    * @param baseEngine [[BaseEngine]] for this [[JavaEvaluation]]
-    * @param metric [[Metric]] for this [[JavaEvaluation]]
-    * @tparam EI Evaluation information class
-    * @tparam Q Query class
-    * @tparam P Predicted result class
-    * @tparam A Actual result class
-    */
-  def setEngineMetric[EI, Q, P, A](
-    baseEngine: BaseEngine[EI, Q, P, A],
-    metric: Metric[EI, Q, P, A, _]) {
-
-    engineMetric = (baseEngine, metric)
-  }
-
-  /** Set the [[BaseEngine]] and [[Metric]]s for this [[JavaEvaluation]]
-    *
-    * @param baseEngine [[BaseEngine]] for this [[JavaEvaluation]]
-    * @param metric [[Metric]] for this [[JavaEvaluation]]
-    * @param metrics Other [[Metric]]s for this [[JavaEvaluation]]
-    * @tparam EI Evaluation information class
-    * @tparam Q Query class
-    * @tparam P Predicted result class
-    * @tparam A Actual result class
-    */
-  def setEngineMetrics[EI, Q, P, A](
-    baseEngine: BaseEngine[EI, Q, P, A],
-    metric: Metric[EI, Q, P, A, _],
-    metrics: java.util.List[_ <: Metric[EI, Q, P, A, _]]) {
-
-    engineMetrics = (baseEngine, metric, asScalaBuffer(metrics))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/LJavaAlgorithm.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/io/prediction/controller/java/LJavaAlgorithm.scala 
b/core/src/main/scala/io/prediction/controller/java/LJavaAlgorithm.scala
deleted file mode 100644
index ba6ed2d..0000000
--- a/core/src/main/scala/io/prediction/controller/java/LJavaAlgorithm.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller.java
-
-import io.prediction.controller.LAlgorithm
-
-import scala.reflect.ClassTag
-
-/** Base class of a Java local algorithm. Refer to [[LAlgorithm]] for 
documentation.
-  *
-  * @tparam PD Prepared data class.
-  * @tparam M Trained model class.
-  * @tparam Q Input query class.
-  * @tparam P Output prediction class.
-  * @group Algorithm
-  */
-abstract class LJavaAlgorithm[PD, M, Q, P]
-  extends LAlgorithm[PD, M, Q, P]()(ClassTag.AnyRef.asInstanceOf[ClassTag[M]])

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/LJavaDataSource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/io/prediction/controller/java/LJavaDataSource.scala 
b/core/src/main/scala/io/prediction/controller/java/LJavaDataSource.scala
deleted file mode 100644
index dfafba4..0000000
--- a/core/src/main/scala/io/prediction/controller/java/LJavaDataSource.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller.java
-
-import io.prediction.controller.LDataSource
-
-import scala.reflect.ClassTag
-
-/** Base class of a Java local data source. Refer to [[LDataSource]] for 
documentation.
-  *
-  * @tparam TD Training data class.
-  * @tparam EI Evaluation Info class.
-  * @tparam Q Input query class.
-  * @tparam A Actual value class.
-  * @group Data Source
-  */
-abstract class LJavaDataSource[TD, EI, Q, A]
-  extends LDataSource[TD, EI, Q, 
A]()(ClassTag.AnyRef.asInstanceOf[ClassTag[TD]])

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/LJavaPreparator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/io/prediction/controller/java/LJavaPreparator.scala 
b/core/src/main/scala/io/prediction/controller/java/LJavaPreparator.scala
deleted file mode 100644
index 321a100..0000000
--- a/core/src/main/scala/io/prediction/controller/java/LJavaPreparator.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller.java
-
-import io.prediction.controller.LPreparator
-
-import scala.reflect.ClassTag
-
-/** Base class of a Java local preparator. Refer to [[LPreparator]] for 
documentation.
-  *
-  * @tparam TD Training data class.
-  * @tparam PD Prepared data class.
-  * @group Preparator
-  */
-abstract class LJavaPreparator[TD, PD]
-  extends LPreparator[TD, PD]()(ClassTag.AnyRef.asInstanceOf[ClassTag[PD]])

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/LJavaServing.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/io/prediction/controller/java/LJavaServing.scala 
b/core/src/main/scala/io/prediction/controller/java/LJavaServing.scala
deleted file mode 100644
index f664c38..0000000
--- a/core/src/main/scala/io/prediction/controller/java/LJavaServing.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller.java
-
-import io.prediction.controller.LServing
-
-/** Base class of Java local serving. Refer to [[LServing]] for documentation.
-  *
-  * @tparam Q Input query class.
-  * @tparam P Output prediction class.
-  * @group Serving
-  */
-abstract class LJavaServing[Q, P] extends LServing[Q, P]

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/P2LJavaAlgorithm.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/io/prediction/controller/java/P2LJavaAlgorithm.scala 
b/core/src/main/scala/io/prediction/controller/java/P2LJavaAlgorithm.scala
deleted file mode 100644
index fcf81a0..0000000
--- a/core/src/main/scala/io/prediction/controller/java/P2LJavaAlgorithm.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller.java
-
-import io.prediction.controller.P2LAlgorithm
-
-import scala.reflect.ClassTag
-
-/** Base class of a Java parallel-to-local algorithm. Refer to 
[[P2LAlgorithm]] for documentation.
-  *
-  * @tparam PD Prepared data class.
-  * @tparam M Trained model class.
-  * @tparam Q Input query class.
-  * @tparam P Output prediction class.
-  * @group Algorithm
-  */
-abstract class P2LJavaAlgorithm[PD, M, Q, P]
-  extends P2LAlgorithm[PD, M, Q, P]()(
-    ClassTag.AnyRef.asInstanceOf[ClassTag[M]],
-    ClassTag.AnyRef.asInstanceOf[ClassTag[Q]])

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/PJavaAlgorithm.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/io/prediction/controller/java/PJavaAlgorithm.scala 
b/core/src/main/scala/io/prediction/controller/java/PJavaAlgorithm.scala
deleted file mode 100644
index d3a370a..0000000
--- a/core/src/main/scala/io/prediction/controller/java/PJavaAlgorithm.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller.java
-
-import io.prediction.controller.PAlgorithm
-
-/** Base class of a Java parallel algorithm. Refer to [[PAlgorithm]] for 
documentation.
-  *
-  * @tparam PD Prepared data class.
-  * @tparam M Trained model class.
-  * @tparam Q Input query class.
-  * @tparam P Output prediction class.
-  * @group Algorithm
-  */
-abstract class PJavaAlgorithm[PD, M, Q, P] extends PAlgorithm[PD, M, Q, P]

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/PJavaDataSource.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/io/prediction/controller/java/PJavaDataSource.scala 
b/core/src/main/scala/io/prediction/controller/java/PJavaDataSource.scala
deleted file mode 100644
index 11b962d..0000000
--- a/core/src/main/scala/io/prediction/controller/java/PJavaDataSource.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller.java
-
-import io.prediction.controller.PDataSource
-
-/** Base class of a Java parallel data source. Refer to [[PDataSource]] for 
documentation.
-  *
-  * @tparam TD Training data class.
-  * @tparam EI Evaluation Info class.
-  * @tparam Q Input query class.
-  * @tparam A Actual value class.
-  * @group Data Source
-  */
-abstract class PJavaDataSource[TD, EI, Q, A] extends PDataSource[TD, EI, Q, A]

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/PJavaPreparator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/io/prediction/controller/java/PJavaPreparator.scala 
b/core/src/main/scala/io/prediction/controller/java/PJavaPreparator.scala
deleted file mode 100644
index 2a9c8f9..0000000
--- a/core/src/main/scala/io/prediction/controller/java/PJavaPreparator.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller.java
-
-import io.prediction.controller.PPreparator
-
-/** Base class of a Java parallel preparator. Refer to [[PPreparator]] for 
documentation
-  *
-  * @tparam TD Training data class.
-  * @tparam PD Prepared data class.
-  * @group Preparator
-  */
-abstract class PJavaPreparator[TD, PD] extends PPreparator[TD, PD]

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/java/SerializableComparator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/io/prediction/controller/java/SerializableComparator.scala
 
b/core/src/main/scala/io/prediction/controller/java/SerializableComparator.scala
deleted file mode 100644
index 0e92f32..0000000
--- 
a/core/src/main/scala/io/prediction/controller/java/SerializableComparator.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.controller.java
-
-import java.util.Comparator
-
-trait SerializableComparator[T] extends Comparator[T] with java.io.Serializable

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/controller/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/controller/package.scala 
b/core/src/main/scala/io/prediction/controller/package.scala
deleted file mode 100644
index bcb4b0d..0000000
--- a/core/src/main/scala/io/prediction/controller/package.scala
+++ /dev/null
@@ -1,168 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction
-
-/** Provides building blocks for writing a complete prediction engine
-  * consisting of DataSource, Preparator, Algorithm, Serving, and Evaluation.
-  *
-  * == Start Building an Engine ==
-  * The starting point of a prediction engine is the [[Engine]] class.
-  *
-  * == The DASE Paradigm ==
-  * The building blocks together form the DASE paradigm. Learn more about DASE
-  * [[http://docs.prediction.io/customize/ here]].
-  *
-  * == Types of Building Blocks ==
-  * Depending on the problem you are solving, you would need to pick 
appropriate
-  * flavors of building blocks.
-  *
-  * === Engines ===
-  * There are 3 typical engine configurations:
-  *
-  *  1. [[PDataSource]], [[PPreparator]], [[P2LAlgorithm]], [[LServing]]
-  *  2. [[PDataSource]], [[PPreparator]], [[PAlgorithm]], [[LServing]]
-  *  3. [[LDataSource]], [[LPreparator]], [[LAlgorithm]], [[LServing]]
-  *
-  * In both configurations 1 and 2, data is sourced and prepared in a
-  * parallelized fashion, with data type as RDD.
-  *
-  * The difference between configurations 1 and 2 come at the algorithm stage.
-  * In configuration 1, the algorithm operates on potentially large data as 
RDDs
-  * in the Spark cluster, and eventually outputs a model that is small enough 
to
-  * fit in a single machine.
-  *
-  * On the other hand, configuration 2 outputs a model that is potentially too
-  * large to fit in a single machine, and must reside in the Spark cluster as
-  * RDD(s).
-  *
-  * With configuration 1 ([[P2LAlgorithm]]), PredictionIO will automatically
-  * try to persist the model to local disk or HDFS if the model is 
serializable.
-  *
-  * With configuration 2 ([[PAlgorithm]]), PredictionIO will not automatically
-  * try to persist the model, unless the model implements the 
[[PersistentModel]]
-  * trait.
-  *
-  * In special circumstances where both the data and the model are small,
-  * configuration 3 may be used. Beware that RDDs cannot be used with
-  * configuration 3.
-  *
-  * === Data Source ===
-  * [[PDataSource]] is probably the most used data source base class with the
-  * ability to process RDD-based data. [[LDataSource]] '''cannot''' handle
-  * RDD-based data. Use only when you have a special requirement.
-  *
-  * === Preparator ===
-  * With [[PDataSource]], you must pick [[PPreparator]]. The same applies to
-  * [[LDataSource]] and [[LPreparator]].
-  *
-  * === Algorithm ===
-  * The workhorse of the engine comes in 3 different flavors.
-  *
-  * ==== P2LAlgorithm ====
-  * Produces a model that is small enough to fit in a single machine from
-  * [[PDataSource]] and [[PPreparator]]. The model '''cannot''' contain any 
RDD.
-  * If the produced model is serializable, PredictionIO will try to
-  * automatically persist it. In addition, P2LAlgorithm.batchPredict is
-  * already implemented for [[Evaluation]] purpose.
-  *
-  * ==== PAlgorithm ====
-  * Produces a model that could contain RDDs from [[PDataSource]] and
-  * [[PPreparator]]. PredictionIO will not try to persist it automatically
-  * unless the model implements [[PersistentModel]]. 
[[PAlgorithm.batchPredict]]
-  * must be implemented for [[Evaluation]].
-  *
-  * ==== LAlgorithm ====
-  * Produces a model that is small enough to fit in a single machine from
-  * [[LDataSource]] and [[LPreparator]]. The model '''cannot''' contain any 
RDD.
-  * If the produced model is serializable, PredictionIO will try to
-  * automatically persist it. In addition, LAlgorithm.batchPredict is
-  * already implemented for [[Evaluation]] purpose.
-  *
-  * === Serving ===
-  * The serving component comes with only 1 flavor--[[LServing]]. At the 
serving
-  * stage, it is assumed that the result being served is already at a human-
-  * consumable size.
-  *
-  * == Model Persistence ==
-  * PredictionIO tries its best to persist trained models automatically. Please
-  * refer to [[LAlgorithm.makePersistentModel]],
-  * [[P2LAlgorithm.makePersistentModel]], and 
[[PAlgorithm.makePersistentModel]]
-  * for descriptions on different strategies.
-  */
-package object controller {
-
-  /** Base class of several helper types that represent emptiness
-    *
-    * @group Helper
-    */
-  class SerializableClass() extends Serializable
-
-  /** Empty data source parameters.
-    * @group Helper
-    */
-  type EmptyDataSourceParams = EmptyParams
-
-  /** Empty data parameters.
-    * @group Helper
-    */
-  type EmptyDataParams = EmptyParams
-
-  /** Empty evaluation info.
-    * @group Helper
-    */
-  type EmptyEvaluationInfo = SerializableClass
-
-  /** Empty preparator parameters.
-    * @group Helper
-    */
-  type EmptyPreparatorParams = EmptyParams
-
-  /** Empty algorithm parameters.
-    * @group Helper
-    */
-  type EmptyAlgorithmParams = EmptyParams
-
-  /** Empty serving parameters.
-    * @group Helper
-    */
-  type EmptyServingParams = EmptyParams
-
-  /** Empty metrics parameters.
-    * @group Helper
-    */
-  type EmptyMetricsParams = EmptyParams
-
-  /** Empty training data.
-    * @group Helper
-    */
-  type EmptyTrainingData = SerializableClass
-
-  /** Empty prepared data.
-    * @group Helper
-    */
-  type EmptyPreparedData = SerializableClass
-
-  /** Empty model.
-    * @group Helper
-    */
-  type EmptyModel = SerializableClass
-
-  /** Empty actual result.
-    * @group Helper
-    */
-  type EmptyActualResult = SerializableClass
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/core/AbstractDoer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/core/AbstractDoer.scala 
b/core/src/main/scala/io/prediction/core/AbstractDoer.scala
deleted file mode 100644
index 0635b27..0000000
--- a/core/src/main/scala/io/prediction/core/AbstractDoer.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.core
-
-import grizzled.slf4j.Logging
-import io.prediction.annotation.DeveloperApi
-import io.prediction.controller.Params
-
-/** :: DeveloperApi ::
-  * Base class for all controllers
-  */
-@DeveloperApi
-abstract class AbstractDoer extends Serializable
-
-/** :: DeveloperApi ::
-  * Provides facility to instantiate controller classes
-  */
-@DeveloperApi
-object Doer extends Logging {
-  /** :: DeveloperApi ::
-    * Instantiates a controller class using supplied controller parameters as
-    * constructor parameters
-    *
-    * @param cls Class of the controller class
-    * @param params Parameters of the controller class
-    * @tparam C Controller class
-    * @return An instance of the controller class
-    */
-  @DeveloperApi
-  def apply[C <: AbstractDoer] (
-    cls: Class[_ <: C], params: Params): C = {
-
-    // Subclasses only allows two kind of constructors.
-    // 1. Constructor with P <: Params.
-    // 2. Emtpy constructor.
-    // First try (1), if failed, try (2).
-    try {
-      val constr = cls.getConstructor(params.getClass)
-      constr.newInstance(params)
-    } catch {
-      case e: NoSuchMethodException => try {
-        val zeroConstr = cls.getConstructor()
-        zeroConstr.newInstance()
-      } catch {
-        case e: NoSuchMethodException =>
-          error(s"${params.getClass.getName} was used as the constructor " +
-            s"argument to ${e.getMessage}, but no constructor can handle it. " 
+
-            "Aborting.")
-          sys.exit(1)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/core/BaseAlgorithm.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/core/BaseAlgorithm.scala 
b/core/src/main/scala/io/prediction/core/BaseAlgorithm.scala
deleted file mode 100644
index a3d3fad..0000000
--- a/core/src/main/scala/io/prediction/core/BaseAlgorithm.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.core
-
-import com.google.gson.TypeAdapterFactory
-import io.prediction.annotation.DeveloperApi
-import io.prediction.controller.Params
-import io.prediction.controller.Utils
-import net.jodah.typetools.TypeResolver
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-
-/** :: DeveloperApi ::
-  * Base trait with default custom query serializer, exposed to engine 
developer
-  * via [[io.prediction.controller.CustomQuerySerializer]]
-  */
-@DeveloperApi
-trait BaseQuerySerializer {
-  /** :: DeveloperApi ::
-    * Serializer for Scala query classes using
-    * [[io.prediction.controller.Utils.json4sDefaultFormats]]
-    */
-  @DeveloperApi
-  @transient lazy val querySerializer = Utils.json4sDefaultFormats
-
-  /** :: DeveloperApi ::
-    * Serializer for Java query classes using Gson
-    */
-  @DeveloperApi
-  @transient lazy val gsonTypeAdapterFactories = Seq.empty[TypeAdapterFactory]
-}
-
-/** :: DeveloperApi ::
-  * Base class of all algorithm controllers
-  *
-  * @tparam PD Prepared data class
-  * @tparam M Model class
-  * @tparam Q Query class
-  * @tparam P Predicted result class
-  */
-@DeveloperApi
-abstract class BaseAlgorithm[PD, M, Q, P]
-  extends AbstractDoer with BaseQuerySerializer {
-  /** :: DeveloperApi ::
-    * Engine developers should not use this directly. This is called by 
workflow
-    * to train a model.
-    *
-    * @param sc Spark context
-    * @param pd Prepared data
-    * @return Trained model
-    */
-  @DeveloperApi
-  def trainBase(sc: SparkContext, pd: PD): M
-
-  /** :: DeveloperApi ::
-    * Engine developers should not use this directly. This is called by
-    * evaluation workflow to perform batch prediction.
-    *
-    * @param sc Spark context
-    * @param bm Model
-    * @param qs Batch of queries
-    * @return Batch of predicted results
-    */
-  @DeveloperApi
-  def batchPredictBase(sc: SparkContext, bm: Any, qs: RDD[(Long, Q)])
-  : RDD[(Long, P)]
-
-  /** :: DeveloperApi ::
-    * Engine developers should not use this directly. Called by serving to
-    * perform a single prediction.
-    *
-    * @param bm Model
-    * @param q Query
-    * @return Predicted result
-    */
-  @DeveloperApi
-  def predictBase(bm: Any, q: Q): P
-
-  /** :: DeveloperApi ::
-    * Engine developers should not use this directly. Prepare a model for
-    * persistence in the downstream consumer. PredictionIO supports 3 types of
-    * model persistence: automatic persistence, manual persistence, and
-    * re-training on deployment. This method provides a way for downstream
-    * modules to determine which mode the model should be persisted.
-    *
-    * @param sc Spark context
-    * @param modelId Model ID
-    * @param algoParams Algorithm parameters that trained this model
-    * @param bm Model
-    * @return The model itself for automatic persistence, an instance of
-    *         [[io.prediction.workflow.PersistentModelManifest]] for manual
-    *         persistence, or Unit for re-training on deployment
-    */
-  @DeveloperApi
-  def makePersistentModel(
-    sc: SparkContext,
-    modelId: String,
-    algoParams: Params,
-    bm: Any): Any = Unit
-
-  /** :: DeveloperApi ::
-    * Obtains the type signature of query for this algorithm
-    *
-    * @return Type signature of query
-    */
-  def queryClass: Class[Q] = {
-    val types = TypeResolver.resolveRawArguments(classOf[BaseAlgorithm[PD, M, 
Q, P]], getClass)
-    types(2).asInstanceOf[Class[Q]]
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/core/BaseDataSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/core/BaseDataSource.scala 
b/core/src/main/scala/io/prediction/core/BaseDataSource.scala
deleted file mode 100644
index dd1157d..0000000
--- a/core/src/main/scala/io/prediction/core/BaseDataSource.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.core
-
-import io.prediction.annotation.DeveloperApi
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-
-/** :: DeveloperApi ::
-  * Base class of all data source controllers
-  *
-  * @tparam TD Training data class
-  * @tparam EI Evaluation information class
-  * @tparam Q Query class
-  * @tparam A Actual result class
-  */
-@DeveloperApi
-abstract class BaseDataSource[TD, EI, Q, A] extends AbstractDoer {
-  /** :: DeveloperApi ::
-    * Engine developer should not use this directly. This is called by workflow
-    * to read training data.
-    *
-    * @param sc Spark context
-    * @return Training data
-    */
-  @DeveloperApi
-  def readTrainingBase(sc: SparkContext): TD
-
-  /** :: DeveloperApi ::
-    * Engine developer should not use this directly. This is called by
-    * evaluation workflow to read training and validation data.
-    *
-    * @param sc Spark context
-    * @return Sets of training data, evaluation information, queries, and 
actual
-    *         results
-    */
-  @DeveloperApi
-  def readEvalBase(sc: SparkContext): Seq[(TD, EI, RDD[(Q, A)])]
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/core/BaseEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/core/BaseEngine.scala 
b/core/src/main/scala/io/prediction/core/BaseEngine.scala
deleted file mode 100644
index 5356fa7..0000000
--- a/core/src/main/scala/io/prediction/core/BaseEngine.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.core
-
-import io.prediction.annotation.DeveloperApi
-import io.prediction.controller.EngineParams
-import io.prediction.workflow.JsonExtractorOption.JsonExtractorOption
-import io.prediction.workflow.WorkflowParams
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import org.json4s.JValue
-
-/** :: DeveloperApi ::
-  * Base class of all engine controller classes
-  *
-  * @tparam EI Evaluation information class
-  * @tparam Q Query class
-  * @tparam P Predicted result class
-  * @tparam A Actual result class
-  */
-@DeveloperApi
-abstract class BaseEngine[EI, Q, P, A] extends Serializable {
-  /** :: DeveloperApi ::
-    * Implement this method so that training this engine would return a list of
-    * models.
-    *
-    * @param sc An instance of SparkContext.
-    * @param engineParams An instance of [[EngineParams]] for running a single 
training.
-    * @param params An instance of [[WorkflowParams]] that controls the 
workflow.
-    * @return A list of models.
-    */
-  @DeveloperApi
-  def train(
-    sc: SparkContext,
-    engineParams: EngineParams,
-    engineInstanceId: String,
-    params: WorkflowParams): Seq[Any]
-
-  /** :: DeveloperApi ::
-    * Implement this method so that [[io.prediction.controller.Evaluation]] can
-    * use this method to generate inputs for 
[[io.prediction.controller.Metric]].
-    *
-    * @param sc An instance of SparkContext.
-    * @param engineParams An instance of [[EngineParams]] for running a single 
evaluation.
-    * @param params An instance of [[WorkflowParams]] that controls the 
workflow.
-    * @return A list of evaluation information and RDD of query, predicted
-    *         result, and actual result tuple tuple.
-    */
-  @DeveloperApi
-  def eval(
-    sc: SparkContext,
-    engineParams: EngineParams,
-    params: WorkflowParams): Seq[(EI, RDD[(Q, P, A)])]
-
-  /** :: DeveloperApi ::
-    * Override this method to further optimize the process that runs multiple
-    * evaluations (during tuning, for example). By default, this method calls
-    * [[eval]] for each element in the engine parameters list.
-    *
-    * @param sc An instance of SparkContext.
-    * @param engineParamsList A list of [[EngineParams]] for running batch 
evaluation.
-    * @param params An instance of [[WorkflowParams]] that controls the 
workflow.
-    * @return A list of engine parameters and evaluation result (from 
[[eval]]) tuples.
-    */
-  @DeveloperApi
-  def batchEval(
-    sc: SparkContext,
-    engineParamsList: Seq[EngineParams],
-    params: WorkflowParams)
-  : Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])] = {
-    engineParamsList.map { engineParams =>
-      (engineParams, eval(sc, engineParams, params))
-    }
-  }
-
-  /** :: DeveloperApi ::
-    * Implement this method to convert a JValue (read from an engine variant
-    * JSON file) to an instance of [[EngineParams]].
-    *
-    * @param variantJson Content of the engine variant JSON as JValue.
-    * @param jsonExtractor Content of the engine variant JSON as JValue.
-    * @return An instance of [[EngineParams]] converted from JSON.
-    */
-  @DeveloperApi
-  def jValueToEngineParams(variantJson: JValue, jsonExtractor: 
JsonExtractorOption): EngineParams =
-    throw new NotImplementedError("JSON to EngineParams is not implemented.")
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/core/BaseEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/core/BaseEvaluator.scala 
b/core/src/main/scala/io/prediction/core/BaseEvaluator.scala
deleted file mode 100644
index 23fe826..0000000
--- a/core/src/main/scala/io/prediction/core/BaseEvaluator.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.core
-
-import io.prediction.annotation.DeveloperApi
-import io.prediction.annotation.Experimental
-import io.prediction.controller.EngineParams
-import io.prediction.controller.Evaluation
-import io.prediction.workflow.WorkflowParams
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-
-/** :: DeveloperApi ::
-  * Base class of all evaluator controller classes
-  *
-  * @tparam EI Evaluation information class
-  * @tparam Q Query class
-  * @tparam P Predicted result class
-  * @tparam A Actual result class
-  * @tparam ER Evaluation result class
-  */
-@DeveloperApi
-abstract class BaseEvaluator[EI, Q, P, A, ER <: BaseEvaluatorResult]
-  extends AbstractDoer {
-  /** :: DeveloperApi ::
-    * Engine developers should not use this directly. This is called by
-    * evaluation workflow to perform evaluation.
-    *
-    * @param sc Spark context
-    * @param evaluation Evaluation to run
-    * @param engineEvalDataSet Sets of engine parameters and data for 
evaluation
-    * @param params Evaluation workflow parameters
-    * @return Evaluation result
-    */
-  @DeveloperApi
-  def evaluateBase(
-    sc: SparkContext,
-    evaluation: Evaluation,
-    engineEvalDataSet: Seq[(EngineParams, Seq[(EI, RDD[(Q, P, A)])])],
-    params: WorkflowParams): ER
-}
-
-/** Base trait of evaluator result */
-trait BaseEvaluatorResult extends Serializable {
-  /** A short description of the result */
-  def toOneLiner(): String = ""
-
-  /** HTML portion of the rendered evaluator results */
-  def toHTML(): String = ""
-
-  /** JSON portion of the rendered evaluator results */
-  def toJSON(): String = ""
-
-  /** :: Experimental ::
-    * Indicate if this result is inserted into database
-    */
-  @Experimental
-  val noSave: Boolean = false
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/core/BasePreparator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/core/BasePreparator.scala 
b/core/src/main/scala/io/prediction/core/BasePreparator.scala
deleted file mode 100644
index d6d0e45..0000000
--- a/core/src/main/scala/io/prediction/core/BasePreparator.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.core
-
-import io.prediction.annotation.DeveloperApi
-import org.apache.spark.SparkContext
-
-/** :: DeveloperApi ::
-  * Base class of all preparator controller classes
-  *
-  * Dev note: Probably will add an extra parameter for ad hoc JSON formatter
-  *
-  * @tparam TD Training data class
-  * @tparam PD Prepared data class
-  */
-@DeveloperApi
-abstract class BasePreparator[TD, PD]
-  extends AbstractDoer {
-  /** :: DeveloperApi ::
-    * Engine developers should not use this directly. This is called by 
training
-    * workflow to prepare data before handing it over to algorithm
-    *
-    * @param sc Spark context
-    * @param td Training data
-    * @return Prepared data
-    */
-  @DeveloperApi
-  def prepareBase(sc: SparkContext, td: TD): PD
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/core/BaseServing.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/core/BaseServing.scala 
b/core/src/main/scala/io/prediction/core/BaseServing.scala
deleted file mode 100644
index d8bde9e..0000000
--- a/core/src/main/scala/io/prediction/core/BaseServing.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.core
-
-import io.prediction.annotation.DeveloperApi
-import io.prediction.annotation.Experimental
-
-/** :: DeveloperApi ::
-  * Base class of all serving controller classes
-  *
-  * @tparam Q Query class
-  * @tparam P Predicted result class
-  */
-@DeveloperApi
-abstract class BaseServing[Q, P]
-  extends AbstractDoer {
-  /** :: Experimental ::
-    * Engine developers should not use this directly. This is called by serving
-    * layer to supplement process the query before sending it to algorithms.
-    *
-    * @param q Query
-    * @return A supplement Query
-    */
-  @Experimental
-  def supplementBase(q: Q): Q
-
-  /** :: DeveloperApi ::
-    * Engine developers should not use this directly. This is called by serving
-    * layer to combine multiple predicted results from multiple algorithms, and
-    * custom business logic before serving to the end user.
-    *
-    * @param q Query
-    * @param ps List of predicted results
-    * @return A single predicted result
-    */
-  @DeveloperApi
-  def serveBase(q: Q, ps: Seq[P]): P
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/core/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/core/package.scala 
b/core/src/main/scala/io/prediction/core/package.scala
deleted file mode 100644
index c7586c7..0000000
--- a/core/src/main/scala/io/prediction/core/package.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction
-
-/** Core base classes of PredictionIO controller components. Engine developers
-  * should not use these directly.
-  */
-package object core {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/package.scala 
b/core/src/main/scala/io/prediction/package.scala
deleted file mode 100644
index 3e3cc80..0000000
--- a/core/src/main/scala/io/prediction/package.scala
+++ /dev/null
@@ -1,19 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io
-
-/** PredictionIO Scala API */
-package object prediction {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/CoreWorkflow.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/workflow/CoreWorkflow.scala 
b/core/src/main/scala/io/prediction/workflow/CoreWorkflow.scala
deleted file mode 100644
index ad93b1a..0000000
--- a/core/src/main/scala/io/prediction/workflow/CoreWorkflow.scala
+++ /dev/null
@@ -1,163 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.workflow
-
-import io.prediction.controller.EngineParams
-import io.prediction.controller.Evaluation
-import io.prediction.core.BaseEngine
-import io.prediction.core.BaseEvaluator
-import io.prediction.core.BaseEvaluatorResult
-import io.prediction.data.storage.EngineInstance
-import io.prediction.data.storage.EvaluationInstance
-import io.prediction.data.storage.Model
-import io.prediction.data.storage.Storage
-
-import com.github.nscala_time.time.Imports.DateTime
-import grizzled.slf4j.Logger
-
-import scala.language.existentials
-
-/** CoreWorkflow handles PredictionIO metadata and environment variables of
-  * training and evaluation.
-  */
-object CoreWorkflow {
-  @transient lazy val logger = Logger[this.type]
-  @transient lazy val engineInstances = Storage.getMetaDataEngineInstances
-  @transient lazy val evaluationInstances =
-    Storage.getMetaDataEvaluationInstances()
-
-  def runTrain[EI, Q, P, A](
-      engine: BaseEngine[EI, Q, P, A],
-      engineParams: EngineParams,
-      engineInstance: EngineInstance,
-      env: Map[String, String] = WorkflowUtils.pioEnvVars,
-      params: WorkflowParams = WorkflowParams()) {
-    logger.debug("Starting SparkContext")
-    val mode = "training"
-    WorkflowUtils.checkUpgrade(mode, engineInstance.engineFactory)
-
-    val batch = if (params.batch.nonEmpty) {
-      s"{engineInstance.engineFactory} (${params.batch}})"
-    } else {
-      engineInstance.engineFactory
-    }
-    val sc = WorkflowContext(
-      batch,
-      env,
-      params.sparkEnv,
-      mode.capitalize)
-
-    try {
-
-      val models: Seq[Any] = engine.train(
-        sc = sc,
-        engineParams = engineParams,
-        engineInstanceId = engineInstance.id,
-        params = params
-      )
-
-      val instanceId = Storage.getMetaDataEngineInstances
-
-      val kryo = KryoInstantiator.newKryoInjection
-
-      logger.info("Inserting persistent model")
-      Storage.getModelDataModels.insert(Model(
-        id = engineInstance.id,
-        models = kryo(models)))
-
-      logger.info("Updating engine instance")
-      val engineInstances = Storage.getMetaDataEngineInstances
-      engineInstances.update(engineInstance.copy(
-        status = "COMPLETED",
-        endTime = DateTime.now
-        ))
-
-      logger.info("Training completed successfully.")
-    } catch {
-      case e @(
-          _: StopAfterReadInterruption |
-          _: StopAfterPrepareInterruption) => {
-        logger.info(s"Training interrupted by $e.")
-      }
-    } finally {
-      logger.debug("Stopping SparkContext")
-      sc.stop()
-    }
-  }
-
-  def runEvaluation[EI, Q, P, A, R <: BaseEvaluatorResult](
-      evaluation: Evaluation,
-      engine: BaseEngine[EI, Q, P, A],
-      engineParamsList: Seq[EngineParams],
-      evaluationInstance: EvaluationInstance,
-      evaluator: BaseEvaluator[EI, Q, P, A, R],
-      env: Map[String, String] = WorkflowUtils.pioEnvVars,
-      params: WorkflowParams = WorkflowParams()) {
-    logger.info("runEvaluation started")
-    logger.debug("Start SparkContext")
-
-    val mode = "evaluation"
-
-    WorkflowUtils.checkUpgrade(mode, engine.getClass.getName)
-
-    val batch = if (params.batch.nonEmpty) {
-      s"{evaluation.getClass.getName} (${params.batch}})"
-    } else {
-      evaluation.getClass.getName
-    }
-    val sc = WorkflowContext(
-      batch,
-      env,
-      params.sparkEnv,
-      mode.capitalize)
-    val evaluationInstanceId = evaluationInstances.insert(evaluationInstance)
-
-    logger.info(s"Starting evaluation instance ID: $evaluationInstanceId")
-
-    val evaluatorResult: BaseEvaluatorResult = 
EvaluationWorkflow.runEvaluation(
-      sc,
-      evaluation,
-      engine,
-      engineParamsList,
-      evaluator,
-      params)
-
-    if (evaluatorResult.noSave) {
-      logger.info(s"This evaluation result is not inserted into database: 
$evaluatorResult")
-    } else {
-      val evaluatedEvaluationInstance = evaluationInstance.copy(
-        status = "EVALCOMPLETED",
-        id = evaluationInstanceId,
-        endTime = DateTime.now,
-        evaluatorResults = evaluatorResult.toOneLiner,
-        evaluatorResultsHTML = evaluatorResult.toHTML,
-        evaluatorResultsJSON = evaluatorResult.toJSON
-      )
-
-      logger.info(s"Updating evaluation instance with result: 
$evaluatorResult")
-
-      evaluationInstances.update(evaluatedEvaluationInstance)
-    }
-
-    logger.debug("Stop SparkContext")
-
-    sc.stop()
-
-    logger.info("runEvaluation completed")
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4f03388e/core/src/main/scala/io/prediction/workflow/CreateServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/prediction/workflow/CreateServer.scala 
b/core/src/main/scala/io/prediction/workflow/CreateServer.scala
deleted file mode 100644
index a664187..0000000
--- a/core/src/main/scala/io/prediction/workflow/CreateServer.scala
+++ /dev/null
@@ -1,737 +0,0 @@
-/** Copyright 2015 TappingStone, Inc.
-  *
-  * Licensed under the Apache License, Version 2.0 (the "License");
-  * you may not use this file except in compliance with the License.
-  * You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package io.prediction.workflow
-
-import java.io.PrintWriter
-import java.io.Serializable
-import java.io.StringWriter
-import java.util.concurrent.TimeUnit
-
-import akka.actor._
-import akka.event.Logging
-import akka.io.IO
-import akka.pattern.ask
-import akka.util.Timeout
-import com.github.nscala_time.time.Imports.DateTime
-import com.twitter.bijection.Injection
-import com.twitter.chill.KryoBase
-import com.twitter.chill.KryoInjection
-import com.twitter.chill.ScalaKryoInstantiator
-import com.typesafe.config.ConfigFactory
-import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer
-import grizzled.slf4j.Logging
-import io.prediction.authentication.KeyAuthentication
-import io.prediction.configuration.SSLConfiguration
-import io.prediction.controller.Engine
-import io.prediction.controller.Params
-import io.prediction.controller.Utils
-import io.prediction.controller.WithPrId
-import io.prediction.core.BaseAlgorithm
-import io.prediction.core.BaseServing
-import io.prediction.core.Doer
-import io.prediction.data.storage.EngineInstance
-import io.prediction.data.storage.EngineManifest
-import io.prediction.data.storage.Storage
-import io.prediction.workflow.JsonExtractorOption.JsonExtractorOption
-import org.json4s._
-import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.write
-import spray.can.Http
-import spray.can.server.ServerSettings
-import spray.http.MediaTypes._
-import spray.http._
-import spray.httpx.Json4sSupport
-import spray.routing._
-import spray.routing.authentication.{UserPass, BasicAuth}
-
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.Future
-import scala.concurrent.duration._
-import scala.concurrent.future
-import scala.language.existentials
-import scala.util.Failure
-import scala.util.Random
-import scala.util.Success
-import scalaj.http.HttpOptions
-
-class KryoInstantiator(classLoader: ClassLoader) extends ScalaKryoInstantiator 
{
-  override def newKryo(): KryoBase = {
-    val kryo = super.newKryo()
-    kryo.setClassLoader(classLoader)
-    SynchronizedCollectionsSerializer.registerSerializers(kryo)
-    kryo
-  }
-}
-
-object KryoInstantiator extends Serializable {
-  def newKryoInjection : Injection[Any, Array[Byte]] = {
-    val kryoInstantiator = new KryoInstantiator(getClass.getClassLoader)
-    KryoInjection.instance(kryoInstantiator)
-  }
-}
-
-case class ServerConfig(
-  batch: String = "",
-  engineInstanceId: String = "",
-  engineId: Option[String] = None,
-  engineVersion: Option[String] = None,
-  engineVariant: String = "",
-  env: Option[String] = None,
-  ip: String = "0.0.0.0",
-  port: Int = 8000,
-  feedback: Boolean = false,
-  eventServerIp: String = "0.0.0.0",
-  eventServerPort: Int = 7070,
-  accessKey: Option[String] = None,
-  logUrl: Option[String] = None,
-  logPrefix: Option[String] = None,
-  logFile: Option[String] = None,
-  verbose: Boolean = false,
-  debug: Boolean = false,
-  jsonExtractor: JsonExtractorOption = JsonExtractorOption.Both)
-
-case class StartServer()
-case class BindServer()
-case class StopServer()
-case class ReloadServer()
-case class UpgradeCheck()
-
-
-object CreateServer extends Logging {
-  val actorSystem = ActorSystem("pio-server")
-  val engineInstances = Storage.getMetaDataEngineInstances
-  val engineManifests = Storage.getMetaDataEngineManifests
-  val modeldata = Storage.getModelDataModels
-
-  def main(args: Array[String]): Unit = {
-    val parser = new scopt.OptionParser[ServerConfig]("CreateServer") {
-      opt[String]("batch") action { (x, c) =>
-        c.copy(batch = x)
-      } text("Batch label of the deployment.")
-      opt[String]("engineId") action { (x, c) =>
-        c.copy(engineId = Some(x))
-      } text("Engine ID.")
-      opt[String]("engineVersion") action { (x, c) =>
-        c.copy(engineVersion = Some(x))
-      } text("Engine version.")
-      opt[String]("engine-variant") required() action { (x, c) =>
-        c.copy(engineVariant = x)
-      } text("Engine variant JSON.")
-      opt[String]("ip") action { (x, c) =>
-        c.copy(ip = x)
-      }
-      opt[String]("env") action { (x, c) =>
-        c.copy(env = Some(x))
-      } text("Comma-separated list of environmental variables (in 'FOO=BAR' " +
-        "format) to pass to the Spark execution environment.")
-      opt[Int]("port") action { (x, c) =>
-        c.copy(port = x)
-      } text("Port to bind to (default: 8000).")
-      opt[String]("engineInstanceId") required() action { (x, c) =>
-        c.copy(engineInstanceId = x)
-      } text("Engine instance ID.")
-      opt[Unit]("feedback") action { (_, c) =>
-        c.copy(feedback = true)
-      } text("Enable feedback loop to event server.")
-      opt[String]("event-server-ip") action { (x, c) =>
-        c.copy(eventServerIp = x)
-      }
-      opt[Int]("event-server-port") action { (x, c) =>
-        c.copy(eventServerPort = x)
-      } text("Event server port. Default: 7070")
-      opt[String]("accesskey") action { (x, c) =>
-        c.copy(accessKey = Some(x))
-      } text("Event server access key.")
-      opt[String]("log-url") action { (x, c) =>
-        c.copy(logUrl = Some(x))
-      }
-      opt[String]("log-prefix") action { (x, c) =>
-        c.copy(logPrefix = Some(x))
-      }
-      opt[String]("log-file") action { (x, c) =>
-        c.copy(logFile = Some(x))
-      }
-      opt[Unit]("verbose") action { (x, c) =>
-        c.copy(verbose = true)
-      } text("Enable verbose output.")
-      opt[Unit]("debug") action { (x, c) =>
-        c.copy(debug = true)
-      } text("Enable debug output.")
-      opt[String]("json-extractor") action { (x, c) =>
-        c.copy(jsonExtractor = JsonExtractorOption.withName(x))
-      }
-    }
-
-    parser.parse(args, ServerConfig()) map { sc =>
-      WorkflowUtils.modifyLogging(sc.verbose)
-      engineInstances.get(sc.engineInstanceId) map { engineInstance =>
-        val engineId = sc.engineId.getOrElse(engineInstance.engineId)
-        val engineVersion = sc.engineVersion.getOrElse(
-          engineInstance.engineVersion)
-        engineManifests.get(engineId, engineVersion) map { manifest =>
-          val engineFactoryName = engineInstance.engineFactory
-          val upgrade = actorSystem.actorOf(Props(
-            classOf[UpgradeActor],
-            engineFactoryName))
-          actorSystem.scheduler.schedule(
-            0.seconds,
-            1.days,
-            upgrade,
-            UpgradeCheck())
-          val master = actorSystem.actorOf(Props(
-            classOf[MasterActor],
-            sc,
-            engineInstance,
-            engineFactoryName,
-            manifest),
-          "master")
-          implicit val timeout = Timeout(5.seconds)
-          master ? StartServer()
-          actorSystem.awaitTermination
-        } getOrElse {
-          error(s"Invalid engine ID or version. Aborting server.")
-        }
-      } getOrElse {
-        error(s"Invalid engine instance ID. Aborting server.")
-      }
-    }
-  }
-
-  def createServerActorWithEngine[TD, EIN, PD, Q, P, A](
-    sc: ServerConfig,
-    engineInstance: EngineInstance,
-    engine: Engine[TD, EIN, PD, Q, P, A],
-    engineLanguage: EngineLanguage.Value,
-    manifest: EngineManifest): ActorRef = {
-
-    val engineParams = engine.engineInstanceToEngineParams(engineInstance, 
sc.jsonExtractor)
-
-    val kryo = KryoInstantiator.newKryoInjection
-
-    val modelsFromEngineInstance =
-      kryo.invert(modeldata.get(engineInstance.id).get.models).get.
-      asInstanceOf[Seq[Any]]
-
-    val batch = if (engineInstance.batch.nonEmpty) {
-      s"${engineInstance.engineFactory} (${engineInstance.batch})"
-    } else {
-      engineInstance.engineFactory
-    }
-
-    val sparkContext = WorkflowContext(
-      batch = batch,
-      executorEnv = engineInstance.env,
-      mode = "Serving",
-      sparkEnv = engineInstance.sparkConf)
-
-    val models = engine.prepareDeploy(
-      sparkContext,
-      engineParams,
-      engineInstance.id,
-      modelsFromEngineInstance,
-      params = WorkflowParams()
-    )
-
-    val algorithms = engineParams.algorithmParamsList.map { case (n, p) =>
-      Doer(engine.algorithmClassMap(n), p)
-    }
-
-    val servingParamsWithName = engineParams.servingParams
-
-    val serving = Doer(engine.servingClassMap(servingParamsWithName._1),
-      servingParamsWithName._2)
-
-    actorSystem.actorOf(
-      Props(
-        classOf[ServerActor[Q, P]],
-        sc,
-        engineInstance,
-        engine,
-        engineLanguage,
-        manifest,
-        engineParams.dataSourceParams._2,
-        engineParams.preparatorParams._2,
-        algorithms,
-        engineParams.algorithmParamsList.map(_._2),
-        models,
-        serving,
-        engineParams.servingParams._2))
-  }
-}
-
-class UpgradeActor(engineClass: String) extends Actor {
-  val log = Logging(context.system, this)
-  implicit val system = context.system
-  def receive: Actor.Receive = {
-    case x: UpgradeCheck =>
-      WorkflowUtils.checkUpgrade("deployment", engineClass)
-  }
-}
-
-class MasterActor (
-    sc: ServerConfig,
-    engineInstance: EngineInstance,
-    engineFactoryName: String,
-    manifest: EngineManifest) extends Actor with SSLConfiguration with 
KeyAuthentication {
-  val log = Logging(context.system, this)
-  implicit val system = context.system
-  var sprayHttpListener: Option[ActorRef] = None
-  var currentServerActor: Option[ActorRef] = None
-  var retry = 3
-
-  def undeploy(ip: String, port: Int): Unit = {
-    val serverUrl = s"https://${ip}:${port}";
-    log.info(
-      s"Undeploying any existing engine instance at $serverUrl")
-    try {
-      val code = scalaj.http.Http(s"$serverUrl/stop")
-        .option(HttpOptions.allowUnsafeSSL)
-        .param(ServerKey.param, ServerKey.get)
-        .method("POST").asString.code
-      code match {
-        case 200 => Unit
-        case 404 => log.error(
-          s"Another process is using $serverUrl. Unable to undeploy.")
-        case _ => log.error(
-          s"Another process is using $serverUrl, or an existing " +
-          s"engine server is not responding properly (HTTP $code). " +
-          "Unable to undeploy.")
-      }
-    } catch {
-      case e: java.net.ConnectException =>
-        log.warning(s"Nothing at $serverUrl")
-      case _: Throwable =>
-        log.error("Another process might be occupying " +
-          s"$ip:$port. Unable to undeploy.")
-    }
-  }
-
-  def receive: Actor.Receive = {
-    case x: StartServer =>
-      val actor = createServerActor(
-        sc,
-        engineInstance,
-        engineFactoryName,
-        manifest)
-      currentServerActor = Some(actor)
-      undeploy(sc.ip, sc.port)
-      self ! BindServer()
-    case x: BindServer =>
-      currentServerActor map { actor =>
-        val settings = ServerSettings(system)
-        IO(Http) ! Http.Bind(
-          actor,
-          interface = sc.ip,
-          port = sc.port,
-          settings = Some(settings.copy(sslEncryption = true)))
-      } getOrElse {
-        log.error("Cannot bind a non-existing server backend.")
-      }
-    case x: StopServer =>
-      log.info(s"Stop server command received.")
-      sprayHttpListener.map { l =>
-        log.info("Server is shutting down.")
-        l ! Http.Unbind(5.seconds)
-        system.shutdown
-      } getOrElse {
-        log.warning("No active server is running.")
-      }
-    case x: ReloadServer =>
-      log.info("Reload server command received.")
-      val latestEngineInstance =
-        CreateServer.engineInstances.getLatestCompleted(
-          manifest.id,
-          manifest.version,
-          engineInstance.engineVariant)
-      latestEngineInstance map { lr =>
-        val actor = createServerActor(sc, lr, engineFactoryName, manifest)
-        sprayHttpListener.map { l =>
-          l ! Http.Unbind(5.seconds)
-          val settings = ServerSettings(system)
-          IO(Http) ! Http.Bind(
-            actor,
-            interface = sc.ip,
-            port = sc.port,
-            settings = Some(settings.copy(sslEncryption = true)))
-          currentServerActor.get ! Kill
-          currentServerActor = Some(actor)
-        } getOrElse {
-          log.warning("No active server is running. Abort reloading.")
-        }
-      } getOrElse {
-        log.warning(
-          s"No latest completed engine instance for ${manifest.id} " +
-          s"${manifest.version}. Abort reloading.")
-      }
-    case x: Http.Bound =>
-      val serverUrl = s"https://${sc.ip}:${sc.port}";
-      log.info(s"Engine is deployed and running. Engine API is live at 
${serverUrl}.")
-      sprayHttpListener = Some(sender)
-    case x: Http.CommandFailed =>
-      if (retry > 0) {
-        retry -= 1
-        log.error(s"Bind failed. Retrying... ($retry more trial(s))")
-        context.system.scheduler.scheduleOnce(1.seconds) {
-          self ! BindServer()
-        }
-      } else {
-        log.error("Bind failed. Shutting down.")
-        system.shutdown
-      }
-  }
-
-  def createServerActor(
-      sc: ServerConfig,
-      engineInstance: EngineInstance,
-      engineFactoryName: String,
-      manifest: EngineManifest): ActorRef = {
-    val (engineLanguage, engineFactory) =
-      WorkflowUtils.getEngine(engineFactoryName, getClass.getClassLoader)
-    val engine = engineFactory()
-
-    // EngineFactory return a base engine, which may not be deployable.
-    if (!engine.isInstanceOf[Engine[_,_,_,_,_,_]]) {
-      throw new NoSuchMethodException(s"Engine $engine is not deployable")
-    }
-
-    val deployableEngine = engine.asInstanceOf[Engine[_,_,_,_,_,_]]
-
-    CreateServer.createServerActorWithEngine(
-      sc,
-      engineInstance,
-      // engine,
-      deployableEngine,
-      engineLanguage,
-      manifest)
-  }
-}
-
-class ServerActor[Q, P](
-    val args: ServerConfig,
-    val engineInstance: EngineInstance,
-    val engine: Engine[_, _, _, Q, P, _],
-    val engineLanguage: EngineLanguage.Value,
-    val manifest: EngineManifest,
-    val dataSourceParams: Params,
-    val preparatorParams: Params,
-    val algorithms: Seq[BaseAlgorithm[_, _, Q, P]],
-    val algorithmsParams: Seq[Params],
-    val models: Seq[Any],
-    val serving: BaseServing[Q, P],
-    val servingParams: Params) extends Actor with HttpService with 
KeyAuthentication {
-  val serverStartTime = DateTime.now
-  val log = Logging(context.system, this)
-
-  var requestCount: Int = 0
-  var avgServingSec: Double = 0.0
-  var lastServingSec: Double = 0.0
-
-  /** The following is required by HttpService */
-  def actorRefFactory: ActorContext = context
-
-  implicit val timeout = Timeout(5, TimeUnit.SECONDS)
-  val pluginsActorRef =
-    context.actorOf(Props(classOf[PluginsActor], args.engineVariant), 
"PluginsActor")
-  val pluginContext = EngineServerPluginContext(log, args.engineVariant)
-
-  def receive: Actor.Receive = runRoute(myRoute)
-
-  val feedbackEnabled = if (args.feedback) {
-    if (args.accessKey.isEmpty) {
-      log.error("Feedback loop cannot be enabled because accessKey is empty.")
-      false
-    } else {
-      true
-    }
-  } else false
-
-  def remoteLog(logUrl: String, logPrefix: String, message: String): Unit = {
-    implicit val formats = Utils.json4sDefaultFormats
-    try {
-      scalaj.http.Http(logUrl).postData(
-        logPrefix + write(Map(
-          "engineInstance" -> engineInstance,
-          "message" -> message))).asString
-    } catch {
-      case e: Throwable =>
-        log.error(s"Unable to send remote log: ${e.getMessage}")
-    }
-  }
-
-  def getStackTraceString(e: Throwable): String = {
-    val writer = new StringWriter()
-    val printWriter = new PrintWriter(writer)
-    e.printStackTrace(printWriter)
-    writer.toString
-  }
-
-  val myRoute =
-    path("") {
-      get {
-        respondWithMediaType(`text/html`) {
-          detach() {
-            complete {
-              html.index(
-                args,
-                manifest,
-                engineInstance,
-                algorithms.map(_.toString),
-                algorithmsParams.map(_.toString),
-                models.map(_.toString),
-                dataSourceParams.toString,
-                preparatorParams.toString,
-                servingParams.toString,
-                serverStartTime,
-                feedbackEnabled,
-                args.eventServerIp,
-                args.eventServerPort,
-                requestCount,
-                avgServingSec,
-                lastServingSec
-              ).toString
-            }
-          }
-        }
-      }
-    } ~
-    path("queries.json") {
-      post {
-        detach() {
-          entity(as[String]) { queryString =>
-            try {
-              val servingStartTime = DateTime.now
-              val jsonExtractorOption = args.jsonExtractor
-              val queryTime = DateTime.now
-              // Extract Query from Json
-              val query = JsonExtractor.extract(
-                jsonExtractorOption,
-                queryString,
-                algorithms.head.queryClass,
-                algorithms.head.querySerializer,
-                algorithms.head.gsonTypeAdapterFactories
-              )
-              val queryJValue = JsonExtractor.toJValue(
-                jsonExtractorOption,
-                query,
-                algorithms.head.querySerializer,
-                algorithms.head.gsonTypeAdapterFactories)
-              // Deploy logic. First call Serving.supplement, then 
Algo.predict,
-              // finally Serving.serve.
-              val supplementedQuery = serving.supplementBase(query)
-              // TODO: Parallelize the following.
-              val predictions = algorithms.zipWithIndex.map { case (a, ai) =>
-                a.predictBase(models(ai), supplementedQuery)
-              }
-              // Notice that it is by design to call Serving.serve with the
-              // *original* query.
-              val prediction = serving.serveBase(query, predictions)
-              val predictionJValue = JsonExtractor.toJValue(
-                jsonExtractorOption,
-                prediction,
-                algorithms.head.querySerializer,
-                algorithms.head.gsonTypeAdapterFactories)
-              /** Handle feedback to Event Server
-                * Send the following back to the Event Server
-                * - appId
-                * - engineInstanceId
-                * - query
-                * - prediction
-                * - prId
-                */
-              val result = if (feedbackEnabled) {
-                implicit val formats =
-                  algorithms.headOption map { alg =>
-                    alg.querySerializer
-                  } getOrElse {
-                    Utils.json4sDefaultFormats
-                  }
-                // val genPrId = Random.alphanumeric.take(64).mkString
-                def genPrId: String = Random.alphanumeric.take(64).mkString
-                val newPrId = prediction match {
-                  case id: WithPrId =>
-                    val org = id.prId
-                    if (org.isEmpty) genPrId else org
-                  case _ => genPrId
-                }
-
-                // also save Query's prId as prId of this pio_pr predict events
-                val queryPrId =
-                  query match {
-                    case id: WithPrId =>
-                      Map("prId" -> id.prId)
-                    case _ =>
-                      Map()
-                  }
-                val data = Map(
-                  // "appId" -> 
dataSourceParams.asInstanceOf[ParamsWithAppId].appId,
-                  "event" -> "predict",
-                  "eventTime" -> queryTime.toString(),
-                  "entityType" -> "pio_pr", // prediction result
-                  "entityId" -> newPrId,
-                  "properties" -> Map(
-                    "engineInstanceId" -> engineInstance.id,
-                    "query" -> query,
-                    "prediction" -> prediction)) ++ queryPrId
-                // At this point args.accessKey should be Some(String).
-                val accessKey = args.accessKey.getOrElse("")
-                val f: Future[Int] = future {
-                  scalaj.http.Http(
-                    s"http://${args.eventServerIp}:${args.eventServerPort}/"; +
-                    s"events.json?accessKey=$accessKey").postData(
-                    write(data)).header(
-                    "content-type", "application/json").asString.code
-                }
-                f onComplete {
-                  case Success(code) => {
-                    if (code != 201) {
-                      log.error(s"Feedback event failed. Status code: $code."
-                        + s"Data: ${write(data)}.")
-                    }
-                  }
-                  case Failure(t) => {
-                    log.error(s"Feedback event failed: ${t.getMessage}") }
-                }
-                // overwrite prId in predictedResult
-                // - if it is WithPrId,
-                //   then overwrite with new prId
-                // - if it is not WithPrId, no prId injection
-                if (prediction.isInstanceOf[WithPrId]) {
-                  predictionJValue merge parse(s"""{"prId" : "$newPrId"}""")
-                } else {
-                  predictionJValue
-                }
-              } else predictionJValue
-
-              val pluginResult =
-                pluginContext.outputBlockers.values.foldLeft(result) { case 
(r, p) =>
-                  p.process(engineInstance, queryJValue, r, pluginContext)
-                }
-
-              // Bookkeeping
-              val servingEndTime = DateTime.now
-              lastServingSec =
-                (servingEndTime.getMillis - servingStartTime.getMillis) / 
1000.0
-              avgServingSec =
-                ((avgServingSec * requestCount) + lastServingSec) /
-                (requestCount + 1)
-              requestCount += 1
-
-              respondWithMediaType(`application/json`) {
-                complete(compact(render(pluginResult)))
-              }
-            } catch {
-              case e: MappingException =>
-                log.error(
-                  s"Query '$queryString' is invalid. Reason: ${e.getMessage}")
-                args.logUrl map { url =>
-                  remoteLog(
-                    url,
-                    args.logPrefix.getOrElse(""),
-                    s"Query:\n$queryString\n\nStack Trace:\n" +
-                      s"${getStackTraceString(e)}\n\n")
-                  }
-                complete(StatusCodes.BadRequest, e.getMessage)
-              case e: Throwable =>
-                val msg = s"Query:\n$queryString\n\nStack Trace:\n" +
-                  s"${getStackTraceString(e)}\n\n"
-                log.error(msg)
-                args.logUrl map { url =>
-                  remoteLog(
-                    url,
-                    args.logPrefix.getOrElse(""),
-                    msg)
-                  }
-                complete(StatusCodes.InternalServerError, msg)
-            }
-          }
-        }
-      }
-    } ~
-    path("reload") {
-      authenticate(withAccessKeyFromFile) { request =>
-        post {
-          complete {
-            context.actorSelection("/user/master") ! ReloadServer()
-            "Reloading..."
-          }
-        }
-      }
-    } ~
-    path("stop") {
-      authenticate(withAccessKeyFromFile) { request =>
-        post {
-          complete {
-            context.system.scheduler.scheduleOnce(1.seconds) {
-              context.actorSelection("/user/master") ! StopServer()
-            }
-            "Shutting down..."
-          }
-        }
-      }
-    } ~
-    pathPrefix("assets") {
-      getFromResourceDirectory("assets")
-    } ~
-    path("plugins.json") {
-      import EngineServerJson4sSupport._
-      get {
-        respondWithMediaType(MediaTypes.`application/json`) {
-          complete {
-            Map("plugins" -> Map(
-              "outputblockers" -> pluginContext.outputBlockers.map { case (n, 
p) =>
-                n -> Map(
-                  "name" -> p.pluginName,
-                  "description" -> p.pluginDescription,
-                  "class" -> p.getClass.getName,
-                  "params" -> pluginContext.pluginParams(p.pluginName))
-              },
-              "outputsniffers" -> pluginContext.outputSniffers.map { case (n, 
p) =>
-                n -> Map(
-                  "name" -> p.pluginName,
-                  "description" -> p.pluginDescription,
-                  "class" -> p.getClass.getName,
-                  "params" -> pluginContext.pluginParams(p.pluginName))
-              }
-            ))
-          }
-        }
-      }
-    } ~
-    path("plugins" / Segments) { segments =>
-      import EngineServerJson4sSupport._
-      get {
-        respondWithMediaType(MediaTypes.`application/json`) {
-          complete {
-            val pluginArgs = segments.drop(2)
-            val pluginType = segments(0)
-            val pluginName = segments(1)
-            pluginType match {
-              case EngineServerPlugin.outputSniffer =>
-                pluginsActorRef ? PluginsActor.HandleREST(
-                  pluginName = pluginName,
-                  pluginArgs = pluginArgs) map {
-                  _.asInstanceOf[String]
-                }
-            }
-          }
-        }
-      }
-    }
-}
-
-object EngineServerJson4sSupport extends Json4sSupport {
-  implicit def json4sFormats: Formats = DefaultFormats
-}

Reply via email to