http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/add-rateevent/build.sbt 
b/examples/scala-parallel-similarproduct/add-rateevent/build.sbt
deleted file mode 100644
index ef66b2f..0000000
--- a/examples/scala-parallel-similarproduct/add-rateevent/build.sbt
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import AssemblyKeys._
-
-assemblySettings
-
-name := "template-scala-parallel-similarproduct"
-
-organization := "org.apache.predictionio"
-
-libraryDependencies ++= Seq(
-  "org.apache.predictionio"    %% "core"          % pioVersion.value % 
"provided",
-  "org.apache.spark" %% "spark-core"    % "1.2.0" % "provided",
-  "org.apache.spark" %% "spark-mllib"   % "1.2.0" % "provided")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/data/import_eventserver.py
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/add-rateevent/data/import_eventserver.py
 
b/examples/scala-parallel-similarproduct/add-rateevent/data/import_eventserver.py
deleted file mode 100644
index 6107d1c..0000000
--- 
a/examples/scala-parallel-similarproduct/add-rateevent/data/import_eventserver.py
+++ /dev/null
@@ -1,90 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Import sample data for similar product engine
-"""
-
-import predictionio
-import argparse
-import random
-
-SEED = 3
-
-def import_events(client):
-  random.seed(SEED)
-  count = 0
-  print client.get_status()
-  print "Importing data..."
-
-  # generate 10 users, with user ids u1,u2,....,u10
-  user_ids = ["u%s" % i for i in range(1, 11)]
-  for user_id in user_ids:
-    print "Set user", user_id
-    client.create_event(
-      event="$set",
-      entity_type="user",
-      entity_id=user_id
-    )
-    count += 1
-
-  # generate 50 items, with item ids i1,i2,....,i50
-  # random assign 1 to 4 categories among c1-c6 to items
-  categories = ["c%s" % i for i in range(1, 7)]
-  item_ids = ["i%s" % i for i in range(1, 51)]
-  for item_id in item_ids:
-    print "Set item", item_id
-    client.create_event(
-      event="$set",
-      entity_type="item",
-      entity_id=item_id,
-      properties={
-        "categories" : random.sample(categories, random.randint(1, 4))
-      }
-    )
-    count += 1
-
-  # each user randomly viewed 10 items
-  for user_id in user_ids:
-    for viewed_item in random.sample(item_ids, 10):
-      print "User", user_id ,"views item", viewed_item
-      client.create_event(
-        event="view",
-        entity_type="user",
-        entity_id=user_id,
-        target_entity_type="item",
-        target_entity_id=viewed_item
-      )
-      count += 1
-
-  print "%s events are imported." % count
-
-if __name__ == '__main__':
-  parser = argparse.ArgumentParser(
-    description="Import sample data for similar product engine")
-  parser.add_argument('--access_key', default='invald_access_key')
-  parser.add_argument('--url', default="http://localhost:7070";)
-
-  args = parser.parse_args()
-  print args
-
-  client = predictionio.EventClient(
-    access_key=args.access_key,
-    url=args.url,
-    threads=5,
-    qsize=500)
-  import_events(client)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/data/send_query.py
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/add-rateevent/data/send_query.py 
b/examples/scala-parallel-similarproduct/add-rateevent/data/send_query.py
deleted file mode 100644
index 8678b15..0000000
--- a/examples/scala-parallel-similarproduct/add-rateevent/data/send_query.py
+++ /dev/null
@@ -1,24 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Send sample query to prediction engine
-"""
-
-import predictionio
-engine_client = predictionio.EngineClient(url="http://localhost:8000";)
-print engine_client.send_query({"items": ["i1", "i3"], "num": 4})

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/engine.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/add-rateevent/engine.json 
b/examples/scala-parallel-similarproduct/add-rateevent/engine.json
deleted file mode 100644
index c55849f..0000000
--- a/examples/scala-parallel-similarproduct/add-rateevent/engine.json
+++ /dev/null
@@ -1,21 +0,0 @@
-{
-  "id": "default",
-  "description": "Default settings",
-  "engineFactory": "org.template.similarproduct.SimilarProductEngine",
-  "datasource": {
-    "params" : {
-      "appId": 9
-    }
-  },
-  "algorithms": [
-    {
-      "name": "als",
-      "params": {
-        "rank": 10,
-        "numIterations" : 20,
-        "lambda": 0.01,
-        "seed": 3
-      }
-    }
-  ]
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/project/assembly.sbt
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/add-rateevent/project/assembly.sbt 
b/examples/scala-parallel-similarproduct/add-rateevent/project/assembly.sbt
deleted file mode 100644
index 54c3252..0000000
--- a/examples/scala-parallel-similarproduct/add-rateevent/project/assembly.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/project/pio-build.sbt
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/add-rateevent/project/pio-build.sbt 
b/examples/scala-parallel-similarproduct/add-rateevent/project/pio-build.sbt
deleted file mode 100644
index 9aed0ee..0000000
--- a/examples/scala-parallel-similarproduct/add-rateevent/project/pio-build.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("org.apache.predictionio" % "pio-build" % "0.9.0")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/ALSAlgorithm.scala
 
b/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/ALSAlgorithm.scala
deleted file mode 100644
index b5c3d3f..0000000
--- 
a/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/ALSAlgorithm.scala
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.controller.P2LAlgorithm
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.BiMap
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.mllib.recommendation.ALS
-import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
-
-import grizzled.slf4j.Logger
-
-import scala.collection.mutable.PriorityQueue
-
-case class ALSAlgorithmParams(
-  rank: Int,
-  numIterations: Int,
-  lambda: Double,
-  seed: Option[Long]) extends Params
-
-class ALSModel(
-  val productFeatures: Map[Int, Array[Double]],
-  val itemStringIntMap: BiMap[String, Int],
-  val items: Map[Int, Item]
-) extends Serializable {
-
-  @transient lazy val itemIntStringMap = itemStringIntMap.inverse
-
-  override def toString = {
-    s" productFeatures: [${productFeatures.size}]" +
-    s"(${productFeatures.take(2).toList}...)" +
-    s" itemStringIntMap: [${itemStringIntMap.size}]" +
-    s"(${itemStringIntMap.take(2).toString}...)]" +
-    s" items: [${items.size}]" +
-    s"(${items.take(2).toString}...)]"
-  }
-}
-
-/**
-  * Use ALS to build item x feature matrix
-  */
-class ALSAlgorithm(val ap: ALSAlgorithmParams)
-  extends P2LAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
-
-  @transient lazy val logger = Logger[this.type]
-
-  def train(sc:SparkContext ,data: PreparedData): ALSModel = {
-    require(!data.rateEvents.take(1).isEmpty,
-      s"rateEvents in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    require(!data.users.take(1).isEmpty,
-      s"users in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    require(!data.items.take(1).isEmpty,
-      s"items in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    // create User and item's String ID to integer index BiMap
-    val userStringIntMap = BiMap.stringInt(data.users.keys)
-    val itemStringIntMap = BiMap.stringInt(data.items.keys)
-
-    // collect Item as Map and convert ID to Int index
-    val items: Map[Int, Item] = data.items.map { case (id, item) =>
-      (itemStringIntMap(id), item)
-    }.collectAsMap.toMap
-
-    val mllibRatings = data.rateEvents
-      .map { r =>
-        // Convert user and item String IDs to Int index for MLlib
-        val uindex = userStringIntMap.getOrElse(r.user, -1)
-        val iindex = itemStringIntMap.getOrElse(r.item, -1)
-
-        if (uindex == -1)
-          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
-            + " to Int index.")
-
-        if (iindex == -1)
-          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
-            + " to Int index.")
-
-        ((uindex, iindex), (r.rating,r.t)) //MODIFIED
-      }.filter { case ((u, i), v) =>
-        // keep events with valid user and item index
-        (u != -1) && (i != -1)
-      }
-      .reduceByKey { case (v1, v2) => // MODIFIED
-        // if a user may rate same item with different value at different 
times,
-        // use the latest value for this case.
-        // Can remove this reduceByKey() if no need to support this case.
-        val (rating1, t1) = v1
-        val (rating2, t2) = v2
-        // keep the latest value
-        if (t1 > t2) v1 else v2
-      }
-      .map { case ((u, i), (rating, t)) => // MODIFIED
-        // MLlibRating requires integer index for user and item
-        MLlibRating(u, i, rating) // MODIFIED
-      }.cache()
-
-    // MLLib ALS cannot handle empty training data.
-    require(!mllibRatings.take(1).isEmpty,
-      s"mllibRatings cannot be empty." +
-      " Please check if your events contain valid user and item ID.")
-
-    // seed for MLlib ALS
-    val seed = ap.seed.getOrElse(System.nanoTime)
-
-    val m = ALS.train(
-      ratings = mllibRatings,
-      rank = ap.rank,
-      iterations = ap.numIterations,
-      lambda = ap.lambda,
-      blocks = -1,
-      seed = seed)
-
-    new ALSModel(
-      productFeatures = m.productFeatures.collectAsMap.toMap,
-      itemStringIntMap = itemStringIntMap,
-      items = items
-    )
-  }
-
-  def predict(model: ALSModel, query: Query): PredictedResult = {
-
-    val productFeatures = model.productFeatures
-
-    // convert items to Int index
-    val queryList: Set[Int] = query.items.map(model.itemStringIntMap.get(_))
-      .flatten.toSet
-
-    val queryFeatures: Vector[Array[Double]] = queryList.toVector
-      // productFeatures may not contain the requested item
-      .map { item => productFeatures.get(item) }
-      .flatten
-
-    val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
-      set.map(model.itemStringIntMap.get(_)).flatten
-    )
-    val blackList: Option[Set[Int]] = query.blackList.map ( set =>
-      set.map(model.itemStringIntMap.get(_)).flatten
-    )
-
-    val ord = Ordering.by[(Int, Double), Double](_._2).reverse
-
-    val indexScores: Array[(Int, Double)] = if (queryFeatures.isEmpty) {
-      logger.info(s"No productFeatures vector for query items ${query.items}.")
-      Array[(Int, Double)]()
-    } else {
-      productFeatures.par // convert to parallel collection
-        .mapValues { f =>
-          queryFeatures.map{ qf =>
-            cosine(qf, f)
-          }.reduce(_ + _)
-        }
-        .filter(_._2 > 0) // keep items with score > 0
-        .seq // convert back to sequential collection
-        .toArray
-    }
-
-    val filteredScore = indexScores.view.filter { case (i, v) =>
-      isCandidateItem(
-        i = i,
-        items = model.items,
-        categories = query.categories,
-        queryList = queryList,
-        whiteList = whiteList,
-        blackList = blackList
-      )
-    }
-
-    val topScores = getTopN(filteredScore, query.num)(ord).toArray
-
-    val itemScores = topScores.map { case (i, s) =>
-      new ItemScore(
-        item = model.itemIntStringMap(i),
-        score = s
-      )
-    }
-
-    new PredictedResult(itemScores)
-  }
-
-  private
-  def getTopN[T](s: Seq[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = {
-
-    val q = PriorityQueue()
-
-    for (x <- s) {
-      if (q.size < n)
-        q.enqueue(x)
-      else {
-        // q is full
-        if (ord.compare(x, q.head) < 0) {
-          q.dequeue()
-          q.enqueue(x)
-        }
-      }
-    }
-
-    q.dequeueAll.toSeq.reverse
-  }
-
-  private
-  def cosine(v1: Array[Double], v2: Array[Double]): Double = {
-    val size = v1.size
-    var i = 0
-    var n1: Double = 0
-    var n2: Double = 0
-    var d: Double = 0
-    while (i < size) {
-      n1 += v1(i) * v1(i)
-      n2 += v2(i) * v2(i)
-      d += v1(i) * v2(i)
-      i += 1
-    }
-    val n1n2 = (math.sqrt(n1) * math.sqrt(n2))
-    if (n1n2 == 0) 0 else (d / n1n2)
-  }
-
-  private
-  def isCandidateItem(
-    i: Int,
-    items: Map[Int, Item],
-    categories: Option[Set[String]],
-    queryList: Set[Int],
-    whiteList: Option[Set[Int]],
-    blackList: Option[Set[Int]]
-  ): Boolean = {
-    whiteList.map(_.contains(i)).getOrElse(true) &&
-    blackList.map(!_.contains(i)).getOrElse(true) &&
-    // discard items in query as well
-    (!queryList.contains(i)) &&
-    // filter categories
-    categories.map { cat =>
-      items(i).categories.map { itemCat =>
-        // keep this item if has ovelap categories with the query
-        !(itemCat.toSet.intersect(cat).isEmpty)
-      }.getOrElse(false) // discard this item if it has no categories
-    }.getOrElse(true)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/DataSource.scala
 
b/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/DataSource.scala
deleted file mode 100644
index 46c4b7a..0000000
--- 
a/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/DataSource.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.controller.PDataSource
-import org.apache.predictionio.controller.EmptyEvaluationInfo
-import org.apache.predictionio.controller.EmptyActualResult
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.Storage
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import grizzled.slf4j.Logger
-
-case class DataSourceParams(appId: Int) extends Params
-
-class DataSource(val dsp: DataSourceParams)
-  extends PDataSource[TrainingData,
-      EmptyEvaluationInfo, Query, EmptyActualResult] {
-
-  @transient lazy val logger = Logger[this.type]
-
-  override
-  def readTraining(sc: SparkContext): TrainingData = {
-    val eventsDb = Storage.getPEvents()
-
-    // create a RDD of (entityID, User)
-    val usersRDD: RDD[(String, User)] = eventsDb.aggregateProperties(
-      appId = dsp.appId,
-      entityType = "user"
-    )(sc).map { case (entityId, properties) =>
-      val user = try {
-        User()
-      } catch {
-        case e: Exception => {
-          logger.error(s"Failed to get properties ${properties} of" +
-            s" user ${entityId}. Exception: ${e}.")
-          throw e
-        }
-      }
-      (entityId, user)
-    }.cache()
-
-    // create a RDD of (entityID, Item)
-    val itemsRDD: RDD[(String, Item)] = eventsDb.aggregateProperties(
-      appId = dsp.appId,
-      entityType = "item"
-    )(sc).map { case (entityId, properties) =>
-      val item = try {
-        // Assume categories is optional property of item.
-        Item(categories = properties.getOpt[List[String]]("categories"))
-      } catch {
-        case e: Exception => {
-          logger.error(s"Failed to get properties ${properties} of" +
-            s" item ${entityId}. Exception: ${e}.")
-          throw e
-        }
-      }
-      (entityId, item)
-    }.cache()
-
-    // get all "user" "rate" "item" events
-    val rateEventsRDD: RDD[RateEvent] = eventsDb.find( //MODIFIED
-      appId = dsp.appId,
-      entityType = Some("user"),
-      eventNames = Some(List("rate")), //MODIFIED
-      // targetEntityType is optional field of an event.
-      targetEntityType = Some(Some("item")))(sc)
-      // eventsDb.find() returns RDD[Event]
-      .map { event =>
-        val rateEvent = try {
-          event.event match {
-            case "rate" => RateEvent( //MODIFIED
-              user = event.entityId,
-              item = event.targetEntityId.get,
-              rating = event.properties.get[Double]("rating"), // ADDED
-              t = event.eventTime.getMillis)
-            case _ => throw new Exception(s"Unexpected event ${event} is 
read.")
-          }
-        } catch {
-          case e: Exception => {
-            logger.error(s"Cannot convert ${event} to RateEvent." + //MODIFIED
-              s" Exception: ${e}.")
-            throw e
-          }
-        }
-        rateEvent
-      }.cache()
-
-    new TrainingData(
-      users = usersRDD,
-      items = itemsRDD,
-      rateEvents = rateEventsRDD
-    )
-  }
-}
-
-case class User()
-
-case class Item(categories: Option[List[String]])
-
-case class RateEvent(user: String, item: String, rating: Double, t: Long)
-
-case class TrainingData(
-  users: RDD[(String, User)],
-  items: RDD[(String, Item)],
-  rateEvents: RDD[RateEvent]
-) {
-  override def toString = {
-    s"users: [${users.count()} (${users.take(2).toList}...)]" +
-    s"items: [${items.count()} (${items.take(2).toList}...)]" +
-    s"rateEvents: [${rateEvents.count()}] (${rateEvents.take(2).toList}...)"
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Engine.scala
 
b/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Engine.scala
deleted file mode 100644
index 52b19fe..0000000
--- 
a/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Engine.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.controller.IEngineFactory
-import org.apache.predictionio.controller.Engine
-
-case class Query(
-  items: List[String],
-  num: Int,
-  categories: Option[Set[String]],
-  whiteList: Option[Set[String]],
-  blackList: Option[Set[String]]
-)
-
-case class PredictedResult(
-  itemScores: Array[ItemScore]
-)
-
-case class ItemScore(
-  item: String,
-  score: Double
-)
-
-object SimilarProductEngine extends IEngineFactory {
-  def apply() = {
-    new Engine(
-      classOf[DataSource],
-      classOf[Preparator],
-      Map("als" -> classOf[ALSAlgorithm]),
-      classOf[Serving])
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Preparator.scala
 
b/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Preparator.scala
deleted file mode 100644
index c7c2b94..0000000
--- 
a/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Preparator.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.controller.PPreparator
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-class Preparator
-  extends PPreparator[TrainingData, PreparedData] {
-
-  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
-    new PreparedData(
-      users = trainingData.users,
-      items = trainingData.items,
-      rateEvents = trainingData.rateEvents)
-  }
-}
-
-class PreparedData(
-  val users: RDD[(String, User)],
-  val items: RDD[(String, Item)],
-  val rateEvents: RDD[RateEvent]
-) extends Serializable

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Serving.scala
 
b/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Serving.scala
deleted file mode 100644
index 1180afd..0000000
--- 
a/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Serving.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.template.similarproduct
-
-import org.apache.predictionio.controller.LServing
-
-class Serving
-  extends LServing[Query, PredictedResult] {
-
-  override
-  def serve(query: Query,
-    predictedResults: Seq[PredictedResult]): PredictedResult = {
-    predictedResults.head
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/template.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/add-rateevent/template.json 
b/examples/scala-parallel-similarproduct/add-rateevent/template.json
deleted file mode 100644
index 932e603..0000000
--- a/examples/scala-parallel-similarproduct/add-rateevent/template.json
+++ /dev/null
@@ -1 +0,0 @@
-{"pio": {"version": { "min": "0.9.0" }}}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/.gitignore
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/filterbyyear/.gitignore 
b/examples/scala-parallel-similarproduct/filterbyyear/.gitignore
deleted file mode 100644
index 64fa18b..0000000
--- a/examples/scala-parallel-similarproduct/filterbyyear/.gitignore
+++ /dev/null
@@ -1,3 +0,0 @@
-manifest.json
-target/
-pio.log

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/README.md
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/filterbyyear/README.md 
b/examples/scala-parallel-similarproduct/filterbyyear/README.md
deleted file mode 100644
index 5af7bc9..0000000
--- a/examples/scala-parallel-similarproduct/filterbyyear/README.md
+++ /dev/null
@@ -1,150 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
--->
-
-# Similar Product Template With Filter by Item Year support
-
-This example engine is based on Similar Product Tempplate version v0.1.1 and 
is modified to support filter recommendation by the item property 'year'.
-
-For example, recommend movies after year 1990.
-
-## Documentation
-
-Please refer to 
http://predictionio.incubator.apache.org/templates/similarproduct/quickstart/
-
-## Development Notes
-
-### Sample data
-
-The movie lens 100k data which is in below format:
-
-UserID::MovieID::Rating::Timestamp
-
-### import ML-100K sample data
-
-Import movielens data using below repository
-https://github.com/k4hoo/systest/tree/master/0.8/ml100k/demo-movielens
-
-```
-$ python -m batch_import <Access Key> http://127.0.0.1:7070
-```
-
-### Changes to Engine.scala
-
-
-1) Added “recommendFromYear” attribute to the Query class. we can pass the 
“recommendFromYear” attribute from the query request.
-
-```scala
-case class Query(
-  items: List[String],
-  num: Int,
-  categories: Option[Set[String]],
-  whiteList: Option[Set[String]],
-  blackList: Option[Set[String]],
-  recommendFromYear: Option[Int]
-) extends Serializable
-```
-
-2)  Added “year” attribute to the class ItemScore.
-
-```scala
-case class ItemScore(
-  item: String,
-  score: Double,
-  year: Int
-) extends Serializable
-
-```
-
-### Changes to DataSource.scala
-
-1) Added attribute “year” to the class Item
-
-```scala
-case class Item(categories: Option[List[String]],year: Int)
-```
-
-2) In the eventsDb.aggregateProperties, adding year property
-
-```scala
-  Item(categories = properties.getOpt[List[String]]("categories"),year = 
properties.get[Int]("year"))
-```
-
-### Changes to ALSAlgorihm.scala
-
-
-1) In the predict method, passing “recommendFromYear” attribute to the 
isCandidateItem method
-
-```scala
-  isCandidateItem(
-    i = i,
-    items = model.items,
-    categories = query.categories,
-    queryList = queryList,
-    whiteList = whiteList,
-    blackList = blackList,
-    recommendFromYear = query.recommendFromYear
-  )
-```
-
-2) In “isCandidateItem” method, verifying if Item’s year is greater than 
“recommendFromYear” attribute.
-
-```scala
-  private def isCandidateItem(
-    i: Int,
-    items: Map[Int, Item],
-    categories: Option[Set[String]],
-    queryList: Set[Int],
-    whiteList: Option[Set[Int]],
-    blackList: Option[Set[Int]],
-    recommendFromYear: Option[Int]
-  ): Boolean = {
-    whiteList.map(_.contains(i)).getOrElse(true) &&
-    blackList.map(!_.contains(i)).getOrElse(true) &&
-    // discard items in query as well
-    (!queryList.contains(i)) &&
-    // filter categories
-    items(i).year > recommendFromYear.getOrElse(1) &&
-    categories.map { cat =>
-      items(i).categories.map { itemCat =>
-        // keep this item if has ovelap categories with the query
-        !(itemCat.toSet.intersect(cat).isEmpty)
-      }.getOrElse(false) // discard this item if it has no categories
-    }.getOrElse(true)
-  }
-```
-
-3)  In the predict method, returning year as well as part of ItemScore
-
-```scala
-    val itemScores = topScores.map { case (i, s) =>
-      new ItemScore(
-        item = model.itemIntStringMap(i),
-        score = s,
-        year = model.items(i).year
-      )
-    }
-
-    new PredictedResult(itemScores)
-```
-
-### Example Request
-
-```
-curl -H "Content-Type: application/json" \
--d '{ "items": ["171"], "num": 10, "recommendFromYear":1990 }' \
-http://localhost:8000/queries.json
-```

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/build.sbt
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/filterbyyear/build.sbt 
b/examples/scala-parallel-similarproduct/filterbyyear/build.sbt
deleted file mode 100644
index 1680f6b..0000000
--- a/examples/scala-parallel-similarproduct/filterbyyear/build.sbt
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import AssemblyKeys._
-
-assemblySettings
-
-name := "template-scala-parallel-similarproduct"
-
-organization := "org.apache.predictionio"
-
-libraryDependencies ++= Seq(
-  "org.apache.predictionio"    %% "core"          % "0.10.0-SNAPSHOT" % 
"provided",
-  "org.apache.spark" %% "spark-core"    % "1.2.0" % "provided",
-  "org.apache.spark" %% "spark-mllib"   % "1.2.0" % "provided")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/data/import_eventserver.py
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/filterbyyear/data/import_eventserver.py
 
b/examples/scala-parallel-similarproduct/filterbyyear/data/import_eventserver.py
deleted file mode 100644
index 6107d1c..0000000
--- 
a/examples/scala-parallel-similarproduct/filterbyyear/data/import_eventserver.py
+++ /dev/null
@@ -1,90 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Import sample data for similar product engine
-"""
-
-import predictionio
-import argparse
-import random
-
-SEED = 3
-
-def import_events(client):
-  random.seed(SEED)
-  count = 0
-  print client.get_status()
-  print "Importing data..."
-
-  # generate 10 users, with user ids u1,u2,....,u10
-  user_ids = ["u%s" % i for i in range(1, 11)]
-  for user_id in user_ids:
-    print "Set user", user_id
-    client.create_event(
-      event="$set",
-      entity_type="user",
-      entity_id=user_id
-    )
-    count += 1
-
-  # generate 50 items, with item ids i1,i2,....,i50
-  # random assign 1 to 4 categories among c1-c6 to items
-  categories = ["c%s" % i for i in range(1, 7)]
-  item_ids = ["i%s" % i for i in range(1, 51)]
-  for item_id in item_ids:
-    print "Set item", item_id
-    client.create_event(
-      event="$set",
-      entity_type="item",
-      entity_id=item_id,
-      properties={
-        "categories" : random.sample(categories, random.randint(1, 4))
-      }
-    )
-    count += 1
-
-  # each user randomly viewed 10 items
-  for user_id in user_ids:
-    for viewed_item in random.sample(item_ids, 10):
-      print "User", user_id ,"views item", viewed_item
-      client.create_event(
-        event="view",
-        entity_type="user",
-        entity_id=user_id,
-        target_entity_type="item",
-        target_entity_id=viewed_item
-      )
-      count += 1
-
-  print "%s events are imported." % count
-
-if __name__ == '__main__':
-  parser = argparse.ArgumentParser(
-    description="Import sample data for similar product engine")
-  parser.add_argument('--access_key', default='invald_access_key')
-  parser.add_argument('--url', default="http://localhost:7070";)
-
-  args = parser.parse_args()
-  print args
-
-  client = predictionio.EventClient(
-    access_key=args.access_key,
-    url=args.url,
-    threads=5,
-    qsize=500)
-  import_events(client)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/data/send_query.py
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/filterbyyear/data/send_query.py 
b/examples/scala-parallel-similarproduct/filterbyyear/data/send_query.py
deleted file mode 100644
index 8678b15..0000000
--- a/examples/scala-parallel-similarproduct/filterbyyear/data/send_query.py
+++ /dev/null
@@ -1,24 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Send sample query to prediction engine
-"""
-
-import predictionio
-engine_client = predictionio.EngineClient(url="http://localhost:8000";)
-print engine_client.send_query({"items": ["i1", "i3"], "num": 4})

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/engine.json
----------------------------------------------------------------------
diff --git a/examples/scala-parallel-similarproduct/filterbyyear/engine.json 
b/examples/scala-parallel-similarproduct/filterbyyear/engine.json
deleted file mode 100644
index 9fc4958..0000000
--- a/examples/scala-parallel-similarproduct/filterbyyear/engine.json
+++ /dev/null
@@ -1,26 +0,0 @@
-{
-  "id": "default",
-  "description": "Default settings",
-  "engineFactory": "com.test.SimilarProductEngine",
-  "datasource": {
-    "params" : {
-      "appId": 1
-      "eventWindow": {
-        "duration": "5 minutes",
-        "removeDuplicates": true,
-        "compressProperties": true
-      }
-    }
-  },
-  "algorithms": [
-    {
-      "name": "als",
-      "params": {
-        "rank": 10,
-        "numIterations" : 20,
-        "lambda": 0.01,
-        "seed": 3
-      }
-    }
-  ]
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/project/assembly.sbt
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/filterbyyear/project/assembly.sbt 
b/examples/scala-parallel-similarproduct/filterbyyear/project/assembly.sbt
deleted file mode 100644
index 54c3252..0000000
--- a/examples/scala-parallel-similarproduct/filterbyyear/project/assembly.sbt
+++ /dev/null
@@ -1 +0,0 @@
-addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/ALSAlgorithm.scala
 
b/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/ALSAlgorithm.scala
deleted file mode 100644
index 1604d74..0000000
--- 
a/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/ALSAlgorithm.scala
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.test
-
-import org.apache.predictionio.controller.P2LAlgorithm
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.BiMap
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.mllib.recommendation.ALS
-import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
-
-import grizzled.slf4j.Logger
-
-import scala.collection.mutable.PriorityQueue
-
-case class ALSAlgorithmParams(
-  rank: Int,
-  numIterations: Int,
-  lambda: Double,
-  seed: Option[Long]) extends Params
-
-class ALSModel(
-  val productFeatures: Map[Int, Array[Double]],
-  val itemStringIntMap: BiMap[String, Int],
-  val items: Map[Int, Item]
-) extends Serializable {
-
-  @transient lazy val itemIntStringMap = itemStringIntMap.inverse
-
-  override def toString = {
-    s" productFeatures: [${productFeatures.size}]" +
-    s"(${productFeatures.take(2).toList}...)" +
-    s" itemStringIntMap: [${itemStringIntMap.size}]" +
-    s"(${itemStringIntMap.take(2).toString}...)]" +
-    s" items: [${items.size}]" +
-    s"(${items.take(2).toString}...)]"
-  }
-}
-
-/**
-  * Use ALS to build item x feature matrix
-  */
-class ALSAlgorithm(val ap: ALSAlgorithmParams)
-  extends P2LAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
-
-  @transient lazy val logger = Logger[this.type]
-
-  def train(sc: SparkContext, data: PreparedData): ALSModel = {
-    require(!data.viewEvents.take(1).isEmpty,
-      s"viewEvents in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    require(!data.users.take(1).isEmpty,
-      s"users in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    require(!data.items.take(1).isEmpty,
-      s"items in PreparedData cannot be empty." +
-      " Please check if DataSource generates TrainingData" +
-      " and Preprator generates PreparedData correctly.")
-    // create User and item's String ID to integer index BiMap
-    val userStringIntMap = BiMap.stringInt(data.users.keys)
-    val itemStringIntMap = BiMap.stringInt(data.items.keys)
-
-    // collect Item as Map and convert ID to Int index
-    val items: Map[Int, Item] = data.items.map { case (id, item) =>
-      (itemStringIntMap(id), item)
-    }.collectAsMap.toMap
-
-    val mllibRatings = data.viewEvents
-      .map { r =>
-        // Convert user and item String IDs to Int index for MLlib
-        val uindex = userStringIntMap.getOrElse(r.user, -1)
-        val iindex = itemStringIntMap.getOrElse(r.item, -1)
-
-        if (uindex == -1)
-          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
-            + " to Int index.")
-
-        if (iindex == -1)
-          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
-            + " to Int index.")
-
-        ((uindex, iindex), 1)
-      }.filter { case ((u, i), v) =>
-        // keep events with valid user and item index
-        (u != -1) && (i != -1)
-      }.reduceByKey(_ + _) // aggregate all view events of same user-item pair
-      .map { case ((u, i), v) =>
-        // MLlibRating requires integer index for user and item
-        MLlibRating(u, i, v)
-      }
-
-    // MLLib ALS cannot handle empty training data.
-    require(!mllibRatings.take(1).isEmpty,
-      s"mllibRatings cannot be empty." +
-      " Please check if your events contain valid user and item ID.")
-
-    // seed for MLlib ALS
-    val seed = ap.seed.getOrElse(System.nanoTime)
-
-    val m = ALS.trainImplicit(
-      ratings = mllibRatings,
-      rank = ap.rank,
-      iterations = ap.numIterations,
-      lambda = ap.lambda,
-      blocks = -1,
-      alpha = 1.0,
-      seed = seed)
-
-    new ALSModel(
-      productFeatures = m.productFeatures.collectAsMap.toMap,
-      itemStringIntMap = itemStringIntMap,
-      items = items
-    )
-  }
-
-  def predict(model: ALSModel, query: Query): PredictedResult = {
-
-    val productFeatures = model.productFeatures
-
-    // convert items to Int index
-    val queryList: Set[Int] = query.items.map(model.itemStringIntMap.get(_))
-      .flatten.toSet
-
-    val queryFeatures: Vector[Array[Double]] = queryList.toVector
-      // productFeatures may not contain the requested item
-      .map { item => productFeatures.get(item) }
-      .flatten
-
-    val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
-      set.map(model.itemStringIntMap.get(_)).flatten
-    )
-    val blackList: Option[Set[Int]] = query.blackList.map ( set =>
-      set.map(model.itemStringIntMap.get(_)).flatten
-    )
-
-    val ord = Ordering.by[(Int, Double), Double](_._2).reverse
-
-    val indexScores: Array[(Int, Double)] = if (queryFeatures.isEmpty) {
-      logger.info(s"No productFeatures vector for query items ${query.items}.")
-      Array[(Int, Double)]()
-    } else {
-      productFeatures.par // convert to parallel collection
-        .mapValues { f =>
-          queryFeatures.map{ qf =>
-            cosine(qf, f)
-          }.reduce(_ + _)
-        }
-        .filter(_._2 > 0) // keep items with score > 0
-        .seq // convert back to sequential collection
-        .toArray
-    }
-
-    val filteredScore = indexScores.view.filter { case (i, v) =>
-      isCandidateItem(
-        i = i,
-        items = model.items,
-        categories = query.categories,
-        queryList = queryList,
-        whiteList = whiteList,
-        blackList = blackList,
-       recommendFromYear = query.recommendFromYear
-      )
-    }
-
-    val topScores = getTopN(filteredScore, query.num)(ord).toArray
-
-    val itemScores = topScores.map { case (i, s) =>
-      new ItemScore(
-        item = model.itemIntStringMap(i),
-        score = s,
-        year = model.items(i).year
-      )
-    }
-
-    new PredictedResult(itemScores)
-  }
-
-  private
-  def getTopN[T](s: Seq[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = {
-
-    val q = PriorityQueue()
-
-    for (x <- s) {
-      if (q.size < n)
-        q.enqueue(x)
-      else {
-        // q is full
-        if (ord.compare(x, q.head) < 0) {
-          q.dequeue()
-          q.enqueue(x)
-        }
-      }
-    }
-
-    q.dequeueAll.toSeq.reverse
-  }
-
-  private
-  def cosine(v1: Array[Double], v2: Array[Double]): Double = {
-    val size = v1.size
-    var i = 0
-    var n1: Double = 0
-    var n2: Double = 0
-    var d: Double = 0
-    while (i < size) {
-      n1 += v1(i) * v1(i)
-      n2 += v2(i) * v2(i)
-      d += v1(i) * v2(i)
-      i += 1
-    }
-    val n1n2 = (math.sqrt(n1) * math.sqrt(n2))
-    if (n1n2 == 0) 0 else (d / n1n2)
-  }
-
-  private
-  def isCandidateItem(
-    i: Int,
-    items: Map[Int, Item],
-    categories: Option[Set[String]],
-    queryList: Set[Int],
-    whiteList: Option[Set[Int]],
-    blackList: Option[Set[Int]],
-    recommendFromYear: Option[Int]
-  ): Boolean = {
-    whiteList.map(_.contains(i)).getOrElse(true) &&
-    blackList.map(!_.contains(i)).getOrElse(true) &&
-    // discard items in query as well
-    (!queryList.contains(i)) && 
-    items(i).year > recommendFromYear.getOrElse(1) &&
-    // filter categories
-    categories.map { cat =>
-      items(i).categories.map { itemCat =>
-        // keep this item if has ovelap categories with the query
-        !(itemCat.toSet.intersect(cat).isEmpty)
-      }.getOrElse(false) // discard this item if it has no categories
-    }.getOrElse(true)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/DataSource.scala
 
b/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/DataSource.scala
deleted file mode 100644
index 89e80ce..0000000
--- 
a/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/DataSource.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.test
-
-import org.apache.predictionio.core.SelfCleaningDataSource
-import org.apache.predictionio.core.EventWindow
-
-import org.apache.predictionio.controller.PDataSource
-import org.apache.predictionio.controller.EmptyEvaluationInfo
-import org.apache.predictionio.controller.EmptyActualResult
-import org.apache.predictionio.controller.Params
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.Storage
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-import grizzled.slf4j.Logger
-
-case class DataSourceParams(appName: String, eventWindow: Option[EventWindow], 
appId: Int) extends Params
-
-class DataSource(val dsp: DataSourceParams)
-  extends PDataSource[TrainingData,
-      EmptyEvaluationInfo, Query, EmptyActualResult] with 
SelfCleaningDataSource {
-
-  @transient override lazy val logger = Logger[this.type]
-
-  override def appName = dsp.appName
-  override def eventWindow = dsp.eventWindow
-
-  override
-  def readTraining(sc: SparkContext): TrainingData = {
-    cleanPersistedPEvents(sc)
-
-    val eventsDb = Storage.getPEvents()
-
-    // create a RDD of (entityID, User)
-    val usersRDD: RDD[(String, User)] = eventsDb.aggregateProperties(
-      appId = dsp.appId,
-      entityType = "user"
-    )(sc).map { case (entityId, properties) =>
-      val user = try {
-        User()
-      } catch {
-        case e: Exception => {
-          logger.error(s"Failed to get properties ${properties} of" +
-            s" user ${entityId}. Exception: ${e}.")
-          throw e
-        }
-      }
-      (entityId, user)
-    }.cache()
-
-    // create a RDD of (entityID, Item)
-    val itemsRDD: RDD[(String, Item)] = eventsDb.aggregateProperties(
-      appId = dsp.appId,
-      entityType = "item"
-    )(sc).map { case (entityId, properties) =>
-      val item = try {
-        // Assume categories is optional property of item.
-        Item(categories = properties.getOpt[List[String]]("categories"), year 
= properties.get[Int]("year"))
-      } catch {
-        case e: Exception => {
-          logger.error(s"Failed to get properties ${properties} of" +
-            s" item ${entityId}. Exception: ${e}.")
-          throw e
-        }
-      }
-      (entityId, item)
-    }.cache()
-
-    // get all "user" "view" "item" events
-    val viewEventsRDD: RDD[ViewEvent] = eventsDb.find(
-      appId = dsp.appId,
-      entityType = Some("user"),
-      eventNames = Some(List("view")),
-      // targetEntityType is optional field of an event.
-      targetEntityType = Some(Some("item")))(sc)
-      // eventsDb.find() returns RDD[Event]
-      .map { event =>
-        val viewEvent = try {
-          event.event match {
-            case "view" => ViewEvent(
-              user = event.entityId,
-              item = event.targetEntityId.get,
-              t = event.eventTime.getMillis)
-            case _ => throw new Exception(s"Unexpected event ${event} is 
read.")
-          }
-        } catch {
-          case e: Exception => {
-            logger.error(s"Cannot convert ${event} to ViewEvent." +
-              s" Exception: ${e}.")
-            throw e
-          }
-        }
-        viewEvent
-      }.cache()
-
-    new TrainingData(
-      users = usersRDD,
-      items = itemsRDD,
-      viewEvents = viewEventsRDD
-    )
-  }
-}
-
-case class User()
-
-case class Item(categories: Option[List[String]], year:Int)
-
-case class ViewEvent(user: String, item: String, t: Long)
-
-class TrainingData(
-  val users: RDD[(String, User)],
-  val items: RDD[(String, Item)],
-  val viewEvents: RDD[ViewEvent]
-) extends Serializable {
-  override def toString = {
-    s"users: [${users.count()} (${users.take(2).toList}...)]" +
-    s"items: [${items.count()} (${items.take(2).toList}...)]" +
-    s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)"
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Engine.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Engine.scala
 
b/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Engine.scala
deleted file mode 100644
index 6800c7b..0000000
--- 
a/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Engine.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.test
-
-import org.apache.predictionio.controller.IEngineFactory
-import org.apache.predictionio.controller.Engine
-
-case class Query(
-  items: List[String],
-  num: Int,
-  categories: Option[Set[String]],
-  whiteList: Option[Set[String]],
-  blackList: Option[Set[String]],
-  recommendFromYear: Option[Int]
-)
-
-case class PredictedResult(
-  itemScores: Array[ItemScore]
-)
-
-case class ItemScore(
-  item: String,
-  score: Double,
-  year: Int
-)
-
-object SimilarProductEngine extends IEngineFactory {
-  def apply() = {
-    new Engine(
-      classOf[DataSource],
-      classOf[Preparator],
-      Map("als" -> classOf[ALSAlgorithm]),
-      classOf[Serving])
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Preparator.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Preparator.scala
 
b/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Preparator.scala
deleted file mode 100644
index 189b7b3..0000000
--- 
a/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Preparator.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.test
-
-import org.apache.predictionio.controller.PPreparator
-
-import org.apache.spark.SparkContext
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-
-class Preparator
-  extends PPreparator[TrainingData, PreparedData] {
-
-  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
-    new PreparedData(
-      users = trainingData.users,
-      items = trainingData.items,
-      viewEvents = trainingData.viewEvents)
-  }
-}
-
-class PreparedData(
-  val users: RDD[(String, User)],
-  val items: RDD[(String, Item)],
-  val viewEvents: RDD[ViewEvent]
-) extends Serializable
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Serving.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Serving.scala
 
b/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Serving.scala
deleted file mode 100644
index cd1a715..0000000
--- 
a/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Serving.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.test
-
-import org.apache.predictionio.controller.LServing
-
-class Serving
-  extends LServing[Query, PredictedResult] {
-
-  override def serve(query: Query,
-    predictedResults: Seq[PredictedResult]): PredictedResult = {
-    predictedResults.head
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/.gitignore
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/multi-events-multi-algos/.gitignore 
b/examples/scala-parallel-similarproduct/multi-events-multi-algos/.gitignore
new file mode 100644
index 0000000..5dbe602
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/.gitignore
@@ -0,0 +1,4 @@
+manifest.json
+target/
+pio.log
+/pio.sbt
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/build.sbt
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/multi-events-multi-algos/build.sbt 
b/examples/scala-parallel-similarproduct/multi-events-multi-algos/build.sbt
new file mode 100644
index 0000000..1daded6
--- /dev/null
+++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/build.sbt
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+name := "template-scala-parallel-similarproduct"
+
+organization := "org.apache.predictionio"
+scalaVersion := "2.11.8"
+libraryDependencies ++= Seq(
+  "org.apache.predictionio" %% "apache-predictionio-core" % 
"0.11.0-incubating" % "provided",
+  "org.apache.spark"        %% "spark-mllib"              % "2.1.1" % 
"provided")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/data/import_eventserver.py
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/multi-events-multi-algos/data/import_eventserver.py
 
b/examples/scala-parallel-similarproduct/multi-events-multi-algos/data/import_eventserver.py
new file mode 100644
index 0000000..4a5cccf
--- /dev/null
+++ 
b/examples/scala-parallel-similarproduct/multi-events-multi-algos/data/import_eventserver.py
@@ -0,0 +1,113 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Import sample data for similar product engine
+"""
+
+import predictionio
+import argparse
+import random
+
+SEED = 3
+
+def import_events(client):
+  random.seed(SEED)
+  count = 0
+  print(client.get_status())
+  print("Importing data...")
+
+  # generate 10 users, with user ids u1,u2,....,u10
+  user_ids = ["u%s" % i for i in range(1, 11)]
+  for user_id in user_ids:
+    print("Set user", user_id)
+    client.create_event(
+      event="$set",
+      entity_type="user",
+      entity_id=user_id
+    )
+    count += 1
+
+  # generate 50 items, with item ids i1,i2,....,i50
+  # random assign 1 to 4 categories among c1-c6 to items
+  categories = ["c%s" % i for i in range(1, 7)]
+  item_ids = ["i%s" % i for i in range(1, 51)]
+  for item_id in item_ids:
+    print("Set item", item_id)
+    client.create_event(
+      event="$set",
+      entity_type="item",
+      entity_id=item_id,
+      properties={
+        "categories" : random.sample(categories, random.randint(1, 4))
+      }
+    )
+    count += 1
+
+  # each user randomly viewed 10 items
+  for user_id in user_ids:
+    for viewed_item in random.sample(item_ids, 10):
+      print("User", user_id ,"views item", viewed_item)
+      client.create_event(
+        event="view",
+        entity_type="user",
+        entity_id=user_id,
+        target_entity_type="item",
+        target_entity_id=viewed_item
+      )
+      count += 1
+
+  # each user randomly liked/disliked 10 items
+  for user_id in user_ids:
+    for viewed_item in random.sample(item_ids, 10):
+      if random.choice((False, True)) :
+        print "User", user_id ,"likes item", viewed_item
+        client.create_event(
+          event="like",
+          entity_type="user",
+          entity_id=user_id,
+          target_entity_type="item",
+          target_entity_id=viewed_item
+        )
+      else:
+        print "User", user_id ,"dislikes item", viewed_item
+        client.create_event(
+          event="dislike",
+          entity_type="user",
+          entity_id=user_id,
+          target_entity_type="item",
+          target_entity_id=viewed_item
+        )
+      count += 1
+
+  print("%s events are imported." % count)
+
+if __name__ == '__main__':
+  parser = argparse.ArgumentParser(
+    description="Import sample data for similar product engine")
+  parser.add_argument('--access_key', default='invald_access_key')
+  parser.add_argument('--url', default="http://localhost:7070";)
+
+  args = parser.parse_args()
+  print(args)
+
+  client = predictionio.EventClient(
+    access_key=args.access_key,
+    url=args.url,
+    threads=5,
+    qsize=500)
+  import_events(client)

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/data/send_query.py
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/multi-events-multi-algos/data/send_query.py
 
b/examples/scala-parallel-similarproduct/multi-events-multi-algos/data/send_query.py
new file mode 100644
index 0000000..0a70f28
--- /dev/null
+++ 
b/examples/scala-parallel-similarproduct/multi-events-multi-algos/data/send_query.py
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Send sample query to prediction engine
+"""
+
+import predictionio
+engine_client = predictionio.EngineClient(url="http://localhost:8000";)
+print(engine_client.send_query({"items": ["i1", "i3"], "num": 4}))

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/engine-cooccurrence.json
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/multi-events-multi-algos/engine-cooccurrence.json
 
b/examples/scala-parallel-similarproduct/multi-events-multi-algos/engine-cooccurrence.json
new file mode 100644
index 0000000..c31b88e
--- /dev/null
+++ 
b/examples/scala-parallel-similarproduct/multi-events-multi-algos/engine-cooccurrence.json
@@ -0,0 +1,18 @@
+{
+  "id": "default",
+  "description": "Default settings",
+  "engineFactory": 
"org.apache.predictionio.examples.similarproduct.SimilarProductEngine",
+  "datasource": {
+    "params" : {
+      "appName": "MyApp1"
+    }
+  },
+  "algorithms": [
+    {
+      "name": "cooccurrence",
+      "params": {
+        "n": 20
+      }
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/engine.json
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/multi-events-multi-algos/engine.json 
b/examples/scala-parallel-similarproduct/multi-events-multi-algos/engine.json
new file mode 100644
index 0000000..dd27a2d
--- /dev/null
+++ 
b/examples/scala-parallel-similarproduct/multi-events-multi-algos/engine.json
@@ -0,0 +1,30 @@
+{
+  "id": "default",
+  "description": "Default settings",
+  "engineFactory": 
"org.apache.predictionio.examples.similarproduct.SimilarProductEngine",
+  "datasource": {
+    "params" : {
+      "appName": "MyApp1"
+    }
+  },
+  "algorithms": [
+    {
+      "name": "als",
+      "params": {
+        "rank": 10,
+        "numIterations" : 20,
+        "lambda": 0.01,
+        "seed": 3
+      }
+    },
+    {
+      "name": "likealgo",
+      "params": {
+        "rank": 8,
+        "numIterations" : 15,
+        "lambda": 0.01,
+        "seed": 3
+      }
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/project/assembly.sbt
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/multi-events-multi-algos/project/assembly.sbt
 
b/examples/scala-parallel-similarproduct/multi-events-multi-algos/project/assembly.sbt
new file mode 100644
index 0000000..e17409e
--- /dev/null
+++ 
b/examples/scala-parallel-similarproduct/multi-events-multi-algos/project/assembly.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/project/build.properties
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/multi-events-multi-algos/project/build.properties
 
b/examples/scala-parallel-similarproduct/multi-events-multi-algos/project/build.properties
new file mode 100644
index 0000000..64317fd
--- /dev/null
+++ 
b/examples/scala-parallel-similarproduct/multi-events-multi-algos/project/build.properties
@@ -0,0 +1 @@
+sbt.version=0.13.15

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/ALSAlgorithm.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/ALSAlgorithm.scala
 
b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/ALSAlgorithm.scala
new file mode 100644
index 0000000..aae8322
--- /dev/null
+++ 
b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/ALSAlgorithm.scala
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.similarproduct
+
+import org.apache.predictionio.controller.P2LAlgorithm
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.recommendation.ALS
+import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}
+
+import grizzled.slf4j.Logger
+
+import scala.collection.mutable.PriorityQueue
+
+case class ALSAlgorithmParams(
+  rank: Int,
+  numIterations: Int,
+  lambda: Double,
+  seed: Option[Long]) extends Params
+
+class ALSModel(
+  val productFeatures: Map[Int, Array[Double]],
+  val itemStringIntMap: BiMap[String, Int],
+  val items: Map[Int, Item]
+) extends Serializable {
+
+  @transient lazy val itemIntStringMap = itemStringIntMap.inverse
+
+  override def toString = {
+    s" productFeatures: [${productFeatures.size}]" +
+    s"(${productFeatures.take(2).toList}...)" +
+    s" itemStringIntMap: [${itemStringIntMap.size}]" +
+    s"(${itemStringIntMap.take(2).toString}...)]" +
+    s" items: [${items.size}]" +
+    s"(${items.take(2).toString}...)]"
+  }
+}
+
+/**
+  * Use ALS to build item x feature matrix
+  */
+class ALSAlgorithm(val ap: ALSAlgorithmParams)
+  extends P2LAlgorithm[PreparedData, ALSModel, Query, PredictedResult] {
+
+  @transient lazy val logger = Logger[this.type]
+
+  def train(sc: SparkContext, data: PreparedData): ALSModel = {
+    require(!data.viewEvents.take(1).isEmpty,
+      s"viewEvents in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    require(!data.users.take(1).isEmpty,
+      s"users in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    require(!data.items.take(1).isEmpty,
+      s"items in PreparedData cannot be empty." +
+      " Please check if DataSource generates TrainingData" +
+      " and Preprator generates PreparedData correctly.")
+    // create User and item's String ID to integer index BiMap
+    val userStringIntMap = BiMap.stringInt(data.users.keys)
+    val itemStringIntMap = BiMap.stringInt(data.items.keys)
+
+    // collect Item as Map and convert ID to Int index
+    val items: Map[Int, Item] = data.items.map { case (id, item) =>
+      (itemStringIntMap(id), item)
+    }.collectAsMap.toMap
+
+    val mllibRatings = data.viewEvents
+      .map { r =>
+        // Convert user and item String IDs to Int index for MLlib
+        val uindex = userStringIntMap.getOrElse(r.user, -1)
+        val iindex = itemStringIntMap.getOrElse(r.item, -1)
+
+        if (uindex == -1)
+          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
+            + " to Int index.")
+
+        if (iindex == -1)
+          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
+            + " to Int index.")
+
+        ((uindex, iindex), 1)
+      }.filter { case ((u, i), v) =>
+        // keep events with valid user and item index
+        (u != -1) && (i != -1)
+      }.reduceByKey(_ + _) // aggregate all view events of same user-item pair
+      .map { case ((u, i), v) =>
+        // MLlibRating requires integer index for user and item
+        MLlibRating(u, i, v)
+      }
+      .cache()
+
+    // MLLib ALS cannot handle empty training data.
+    require(!mllibRatings.take(1).isEmpty,
+      s"mllibRatings cannot be empty." +
+      " Please check if your events contain valid user and item ID.")
+
+    // seed for MLlib ALS
+    val seed = ap.seed.getOrElse(System.nanoTime)
+
+    val m = ALS.trainImplicit(
+      ratings = mllibRatings,
+      rank = ap.rank,
+      iterations = ap.numIterations,
+      lambda = ap.lambda,
+      blocks = -1,
+      alpha = 1.0,
+      seed = seed)
+
+    new ALSModel(
+      productFeatures = m.productFeatures.collectAsMap.toMap,
+      itemStringIntMap = itemStringIntMap,
+      items = items
+    )
+  }
+
+  def predict(model: ALSModel, query: Query): PredictedResult = {
+
+    val productFeatures = model.productFeatures
+
+    // convert items to Int index
+    val queryList: Set[Int] = query.items.map(model.itemStringIntMap.get(_))
+      .flatten.toSet
+
+    val queryFeatures: Vector[Array[Double]] = queryList.toVector
+      // productFeatures may not contain the requested item
+      .map { item => productFeatures.get(item) }
+      .flatten
+
+    val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
+      set.map(model.itemStringIntMap.get(_)).flatten
+    )
+    val blackList: Option[Set[Int]] = query.blackList.map ( set =>
+      set.map(model.itemStringIntMap.get(_)).flatten
+    )
+
+    val ord = Ordering.by[(Int, Double), Double](_._2).reverse
+
+    val indexScores: Array[(Int, Double)] = if (queryFeatures.isEmpty) {
+      logger.info(s"No productFeatures vector for query items ${query.items}.")
+      Array[(Int, Double)]()
+    } else {
+      productFeatures.par // convert to parallel collection
+        .mapValues { f =>
+          queryFeatures.map{ qf =>
+            cosine(qf, f)
+          }.reduce(_ + _)
+        }
+        .filter(_._2 > 0) // keep items with score > 0
+        .seq // convert back to sequential collection
+        .toArray
+    }
+
+    val filteredScore = indexScores.view.filter { case (i, v) =>
+      isCandidateItem(
+        i = i,
+        items = model.items,
+        categories = query.categories,
+        categoryBlackList = query.categoryBlackList,
+        queryList = queryList,
+        whiteList = whiteList,
+        blackList = blackList
+      )
+    }
+
+    val topScores = getTopN(filteredScore, query.num)(ord).toArray
+
+    val itemScores = topScores.map { case (i, s) =>
+      new ItemScore(
+        item = model.itemIntStringMap(i),
+        score = s
+      )
+    }
+
+    new PredictedResult(itemScores)
+  }
+
+  private
+  def getTopN[T](s: Seq[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = {
+
+    val q = PriorityQueue()
+
+    for (x <- s) {
+      if (q.size < n)
+        q.enqueue(x)
+      else {
+        // q is full
+        if (ord.compare(x, q.head) < 0) {
+          q.dequeue()
+          q.enqueue(x)
+        }
+      }
+    }
+
+    q.dequeueAll.toSeq.reverse
+  }
+
+  private
+  def cosine(v1: Array[Double], v2: Array[Double]): Double = {
+    val size = v1.size
+    var i = 0
+    var n1: Double = 0
+    var n2: Double = 0
+    var d: Double = 0
+    while (i < size) {
+      n1 += v1(i) * v1(i)
+      n2 += v2(i) * v2(i)
+      d += v1(i) * v2(i)
+      i += 1
+    }
+    val n1n2 = (math.sqrt(n1) * math.sqrt(n2))
+    if (n1n2 == 0) 0 else (d / n1n2)
+  }
+
+  private
+  def isCandidateItem(
+    i: Int,
+    items: Map[Int, Item],
+    categories: Option[Set[String]],
+    categoryBlackList: Option[Set[String]],
+    queryList: Set[Int],
+    whiteList: Option[Set[Int]],
+    blackList: Option[Set[Int]]
+  ): Boolean = {
+    whiteList.map(_.contains(i)).getOrElse(true) &&
+    blackList.map(!_.contains(i)).getOrElse(true) &&
+    // discard items in query as well
+    (!queryList.contains(i)) &&
+    // filter categories
+    categories.map { cat =>
+      items(i).categories.map { itemCat =>
+        // keep this item if has ovelap categories with the query
+        !(itemCat.toSet.intersect(cat).isEmpty)
+      }.getOrElse(false) // discard this item if it has no categories
+    }.getOrElse(true) &&
+    categoryBlackList.map { cat =>
+      items(i).categories.map { itemCat =>
+        // discard this item if has ovelap categories with the query
+        (itemCat.toSet.intersect(cat).isEmpty)
+      }.getOrElse(true) // keep this item if it has no categories
+    }.getOrElse(true)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/CooccurrenceAlgorithm.scala
----------------------------------------------------------------------
diff --git 
a/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/CooccurrenceAlgorithm.scala
 
b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/CooccurrenceAlgorithm.scala
new file mode 100644
index 0000000..30d0b3e
--- /dev/null
+++ 
b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/CooccurrenceAlgorithm.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.examples.similarproduct
+
+import org.apache.predictionio.controller.P2LAlgorithm
+import org.apache.predictionio.controller.Params
+import org.apache.predictionio.data.storage.BiMap
+
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+case class CooccurrenceAlgorithmParams(
+  n: Int // top co-occurrence
+) extends Params
+
+class CooccurrenceModel(
+  val topCooccurrences: Map[Int, Array[(Int, Int)]],
+  val itemStringIntMap: BiMap[String, Int],
+  val items: Map[Int, Item]
+) extends Serializable {
+  @transient lazy val itemIntStringMap = itemStringIntMap.inverse
+
+  override def toString(): String = {
+    val s = topCooccurrences.mapValues { v => v.mkString(",") }
+    s.toString
+  }
+}
+
+class CooccurrenceAlgorithm(val ap: CooccurrenceAlgorithmParams)
+  extends P2LAlgorithm[PreparedData, CooccurrenceModel, Query, 
PredictedResult] {
+
+  def train(sc: SparkContext, data: PreparedData): CooccurrenceModel = {
+
+    val itemStringIntMap = BiMap.stringInt(data.items.keys)
+
+    val topCooccurrences = trainCooccurrence(
+      events = data.viewEvents,
+      n = ap.n,
+      itemStringIntMap = itemStringIntMap
+    )
+
+    // collect Item as Map and convert ID to Int index
+    val items: Map[Int, Item] = data.items.map { case (id, item) =>
+      (itemStringIntMap(id), item)
+    }.collectAsMap.toMap
+
+    new CooccurrenceModel(
+      topCooccurrences = topCooccurrences,
+      itemStringIntMap = itemStringIntMap,
+      items = items
+    )
+
+  }
+
+  /* given the user-item events, find out top n co-occurrence pair for each 
item */
+  def trainCooccurrence(
+    events: RDD[ViewEvent],
+    n: Int,
+    itemStringIntMap: BiMap[String, Int]): Map[Int, Array[(Int, Int)]] = {
+
+    val userItem = events
+      // map item from string to integer index
+      .flatMap {
+        case ViewEvent(user, item, _) if itemStringIntMap.contains(item) =>
+          Some(user, itemStringIntMap(item))
+        case _ => None
+      }
+      // if user view same item multiple times, only count as once
+      .distinct()
+      .cache()
+
+    val cooccurrences: RDD[((Int, Int), Int)] = userItem.join(userItem)
+      // remove duplicate pair in reversed order for each user. eg. (a,b) vs. 
(b,a)
+      .filter { case (user, (item1, item2)) => item1 < item2 }
+      .map { case (user, (item1, item2)) => ((item1, item2), 1) }
+      .reduceByKey{ (a: Int, b: Int) => a + b }
+
+    val topCooccurrences = cooccurrences
+      .flatMap{ case (pair, count) =>
+        Seq((pair._1, (pair._2, count)), (pair._2, (pair._1, count)))
+      }
+      .groupByKey
+      .map { case (item, itemCounts) =>
+        (item, itemCounts.toArray.sortBy(_._2)(Ordering.Int.reverse).take(n))
+      }
+      .collectAsMap.toMap
+
+    topCooccurrences
+  }
+
+  def predict(model: CooccurrenceModel, query: Query): PredictedResult = {
+
+    // convert items to Int index
+    val queryList: Set[Int] = query.items
+      .flatMap(model.itemStringIntMap.get(_))
+      .toSet
+
+    val whiteList: Option[Set[Int]] = query.whiteList.map( set =>
+      set.map(model.itemStringIntMap.get(_)).flatten
+    )
+
+    val blackList: Option[Set[Int]] = query.blackList.map ( set =>
+      set.map(model.itemStringIntMap.get(_)).flatten
+    )
+
+    val counts: Array[(Int, Int)] = queryList.toVector
+      .flatMap { q =>
+        model.topCooccurrences.getOrElse(q, Array())
+      }
+      .groupBy { case (index, count) => index }
+      .map { case (index, indexCounts) => (index, indexCounts.map(_._2).sum) }
+      .toArray
+
+    val itemScores = counts
+      .filter { case (i, v) =>
+        isCandidateItem(
+          i = i,
+          items = model.items,
+          categories = query.categories,
+          queryList = queryList,
+          whiteList = whiteList,
+          blackList = blackList
+        )
+      }
+      .sortBy(_._2)(Ordering.Int.reverse)
+      .take(query.num)
+      .map { case (index, count) =>
+        ItemScore(
+          item = model.itemIntStringMap(index),
+          score = count
+        )
+      }
+
+    new PredictedResult(itemScores)
+
+  }
+
+  private
+  def isCandidateItem(
+    i: Int,
+    items: Map[Int, Item],
+    categories: Option[Set[String]],
+    queryList: Set[Int],
+    whiteList: Option[Set[Int]],
+    blackList: Option[Set[Int]]
+  ): Boolean = {
+    whiteList.map(_.contains(i)).getOrElse(true) &&
+    blackList.map(!_.contains(i)).getOrElse(true) &&
+    // discard items in query as well
+    (!queryList.contains(i)) &&
+    // filter categories
+    categories.map { cat =>
+      items(i).categories.map { itemCat =>
+        // keep this item if has ovelap categories with the query
+        !(itemCat.toSet.intersect(cat).isEmpty)
+      }.getOrElse(false) // discard this item if it has no categories
+    }.getOrElse(true)
+  }
+
+}


Reply via email to