http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESAccessKeys.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESAccessKeys.scala
deleted file mode 100644
index 7f50488..0000000
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESAccessKeys.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.elasticsearch1
-
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.apache.predictionio.data.storage.AccessKey
-import org.apache.predictionio.data.storage.AccessKeys
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders._
-import org.json4s.JsonDSL._
-import org.json4s._
-import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
-
-import scala.util.Random
-
-/** Elasticsearch implementation of AccessKeys. */
-class ESAccessKeys(client: Client, config: StorageClientConfig, index: String)
-    extends AccessKeys with Logging {
-  implicit val formats = DefaultFormats.lossless
-  private val estype = "accesskeys"
-
-  val indices = client.admin.indices
-  val indexExistResponse = indices.prepareExists(index).get
-  if (!indexExistResponse.isExists) {
-    indices.prepareCreate(index).get
-  }
-  val typeExistResponse = 
indices.prepareTypesExists(index).setTypes(estype).get
-  if (!typeExistResponse.isExists) {
-    val json =
-      (estype ->
-        ("properties" ->
-          ("key" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("events" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
-    indices.preparePutMapping(index).setType(estype).
-      setSource(compact(render(json))).get
-  }
-
-  def insert(accessKey: AccessKey): Option[String] = {
-    val key = if (accessKey.key.isEmpty) generateKey else accessKey.key
-    update(accessKey.copy(key = key))
-    Some(key)
-  }
-
-  def get(key: String): Option[AccessKey] = {
-    try {
-      val response = client.prepareGet(
-        index,
-        estype,
-        key).get()
-      Some(read[AccessKey](response.getSourceAsString))
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        None
-      case e: NullPointerException => None
-    }
-  }
-
-  def getAll(): Seq[AccessKey] = {
-    try {
-      val builder = client.prepareSearch(index).setTypes(estype)
-      ESUtils.getAll[AccessKey](client, builder)
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq[AccessKey]()
-    }
-  }
-
-  def getByAppid(appid: Int): Seq[AccessKey] = {
-    try {
-      val builder = client.prepareSearch(index).setTypes(estype).
-        setPostFilter(termFilter("appid", appid))
-      ESUtils.getAll[AccessKey](client, builder)
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq[AccessKey]()
-    }
-  }
-
-  def update(accessKey: AccessKey): Unit = {
-    try {
-      client.prepareIndex(index, estype, 
accessKey.key).setSource(write(accessKey)).get()
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-    }
-  }
-
-  def delete(key: String): Unit = {
-    try {
-      client.prepareDelete(index, estype, key).get
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESApps.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESApps.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESApps.scala
deleted file mode 100644
index af61e17..0000000
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESApps.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.elasticsearch1
-
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.apache.predictionio.data.storage.App
-import org.apache.predictionio.data.storage.Apps
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders._
-import org.json4s.JsonDSL._
-import org.json4s._
-import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
-
-/** Elasticsearch implementation of Items. */
-class ESApps(client: Client, config: StorageClientConfig, index: String)
-  extends Apps with Logging {
-  implicit val formats = DefaultFormats.lossless
-  private val estype = "apps"
-  private val seq = new ESSequences(client, config, index)
-
-  val indices = client.admin.indices
-  val indexExistResponse = indices.prepareExists(index).get
-  if (!indexExistResponse.isExists) {
-    indices.prepareCreate(index).get
-  }
-  val typeExistResponse = 
indices.prepareTypesExists(index).setTypes(estype).get
-  if (!typeExistResponse.isExists) {
-    val json =
-      (estype ->
-        ("properties" ->
-          ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
-    indices.preparePutMapping(index).setType(estype).
-      setSource(compact(render(json))).get
-  }
-
-  def insert(app: App): Option[Int] = {
-    val id =
-      if (app.id == 0) {
-        var roll = seq.genNext("apps")
-        while (!get(roll).isEmpty) roll = seq.genNext("apps")
-        roll
-      }
-      else app.id
-    val realapp = app.copy(id = id)
-    update(realapp)
-    Some(id)
-  }
-
-  def get(id: Int): Option[App] = {
-    try {
-      val response = client.prepareGet(
-        index,
-        estype,
-        id.toString).get()
-      Some(read[App](response.getSourceAsString))
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        None
-      case e: NullPointerException => None
-    }
-  }
-
-  def getByName(name: String): Option[App] = {
-    try {
-      val response = client.prepareSearch(index).setTypes(estype).
-        setPostFilter(termFilter("name", name)).get
-      val hits = response.getHits().hits()
-      if (hits.size > 0) {
-        Some(read[App](hits.head.getSourceAsString))
-      } else {
-        None
-      }
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        None
-    }
-  }
-
-  def getAll(): Seq[App] = {
-    try {
-      val builder = client.prepareSearch(index).setTypes(estype)
-      ESUtils.getAll[App](client, builder)
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq[App]()
-    }
-  }
-
-  def update(app: App): Unit = {
-    try {
-      val response = client.prepareIndex(index, estype, app.id.toString).
-        setSource(write(app)).get()
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-    }
-  }
-
-  def delete(id: Int): Unit = {
-    try {
-      client.prepareDelete(index, estype, id.toString).get
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESChannels.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESChannels.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESChannels.scala
deleted file mode 100644
index f955bee..0000000
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESChannels.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.elasticsearch1
-
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.Channel
-import org.apache.predictionio.data.storage.Channels
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders.termFilter
-import org.json4s.DefaultFormats
-import org.json4s.JsonDSL._
-import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
-
-class ESChannels(client: Client, config: StorageClientConfig, index: String)
-    extends Channels with Logging {
-
-  implicit val formats = DefaultFormats.lossless
-  private val estype = "channels"
-  private val seq = new ESSequences(client, config, index)
-  private val seqName = "channels"
-
-  val indices = client.admin.indices
-  val indexExistResponse = indices.prepareExists(index).get
-  if (!indexExistResponse.isExists) {
-    indices.prepareCreate(index).get
-  }
-  val typeExistResponse = 
indices.prepareTypesExists(index).setTypes(estype).get
-  if (!typeExistResponse.isExists) {
-    val json =
-      (estype ->
-        ("properties" ->
-          ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
-    indices.preparePutMapping(index).setType(estype).
-      setSource(compact(render(json))).get
-  }
-
-  def insert(channel: Channel): Option[Int] = {
-    val id =
-      if (channel.id == 0) {
-        var roll = seq.genNext(seqName)
-        while (!get(roll).isEmpty) roll = seq.genNext(seqName)
-        roll
-      } else channel.id
-
-    val realChannel = channel.copy(id = id)
-    if (update(realChannel)) Some(id) else None
-  }
-
-  def get(id: Int): Option[Channel] = {
-    try {
-      val response = client.prepareGet(
-        index,
-        estype,
-        id.toString).get()
-      Some(read[Channel](response.getSourceAsString))
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        None
-      case e: NullPointerException => None
-    }
-  }
-
-  def getByAppid(appid: Int): Seq[Channel] = {
-    try {
-      val builder = client.prepareSearch(index).setTypes(estype).
-        setPostFilter(termFilter("appid", appid))
-      ESUtils.getAll[Channel](client, builder)
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq[Channel]()
-    }
-  }
-
-  def update(channel: Channel): Boolean = {
-    try {
-      val response = client.prepareIndex(index, estype, channel.id.toString).
-        setSource(write(channel)).get()
-      true
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        false
-    }
-  }
-
-  def delete(id: Int): Unit = {
-    try {
-      client.prepareDelete(index, estype, id.toString).get
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineInstances.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineInstances.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineInstances.scala
deleted file mode 100644
index cc10ff0..0000000
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineInstances.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.elasticsearch1
-
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.EngineInstance
-import org.apache.predictionio.data.storage.EngineInstanceSerializer
-import org.apache.predictionio.data.storage.EngineInstances
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders._
-import org.elasticsearch.search.sort.SortOrder
-import org.json4s.JsonDSL._
-import org.json4s._
-import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
-
-class ESEngineInstances(client: Client, config: StorageClientConfig, index: 
String)
-  extends EngineInstances with Logging {
-  implicit val formats = DefaultFormats + new EngineInstanceSerializer
-  private val estype = "engine_instances"
-
-  val indices = client.admin.indices
-  val indexExistResponse = indices.prepareExists(index).get
-  if (!indexExistResponse.isExists) {
-    indices.prepareCreate(index).get
-  }
-  val typeExistResponse = 
indices.prepareTypesExists(index).setTypes(estype).get
-  if (!typeExistResponse.isExists) {
-    val json =
-      (estype ->
-        ("properties" ->
-          ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("startTime" -> ("type" -> "date")) ~
-          ("endTime" -> ("type" -> "date")) ~
-          ("engineId" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("engineVersion" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("engineVariant" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("engineFactory" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("batch" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("dataSourceParams" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("preparatorParams" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("algorithmsParams" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("servingParams" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
-    indices.preparePutMapping(index).setType(estype).
-      setSource(compact(render(json))).get
-  }
-
-  def insert(i: EngineInstance): String = {
-    try {
-      val response = client.prepareIndex(index, estype).
-        setSource(write(i)).get
-      response.getId
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        ""
-    }
-  }
-
-  def get(id: String): Option[EngineInstance] = {
-    try {
-      val response = client.prepareGet(index, estype, id).get
-      if (response.isExists) {
-        Some(read[EngineInstance](response.getSourceAsString))
-      } else {
-        None
-      }
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        None
-    }
-  }
-
-  def getAll(): Seq[EngineInstance] = {
-    try {
-      val builder = client.prepareSearch(index).setTypes(estype)
-      ESUtils.getAll[EngineInstance](client, builder)
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq()
-    }
-  }
-
-  def getCompleted(
-      engineId: String,
-      engineVersion: String,
-      engineVariant: String): Seq[EngineInstance] = {
-    try {
-      val builder = client.prepareSearch(index).setTypes(estype).setPostFilter(
-        andFilter(
-          termFilter("status", "COMPLETED"),
-          termFilter("engineId", engineId),
-          termFilter("engineVersion", engineVersion),
-          termFilter("engineVariant", engineVariant))).
-        addSort("startTime", SortOrder.DESC)
-      ESUtils.getAll[EngineInstance](client, builder)
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq()
-    }
-  }
-
-  def getLatestCompleted(
-      engineId: String,
-      engineVersion: String,
-      engineVariant: String): Option[EngineInstance] =
-    getCompleted(
-      engineId,
-      engineVersion,
-      engineVariant).headOption
-
-  def update(i: EngineInstance): Unit = {
-    try {
-      client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get
-    } catch {
-      case e: ElasticsearchException => error(e.getMessage)
-    }
-  }
-
-  def delete(id: String): Unit = {
-    try {
-      val response = client.prepareDelete(index, estype, id).get
-    } catch {
-      case e: ElasticsearchException => error(e.getMessage)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineManifests.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineManifests.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineManifests.scala
deleted file mode 100644
index 307b582..0000000
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEngineManifests.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.elasticsearch1
-
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.EngineManifestSerializer
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.apache.predictionio.data.storage.EngineManifest
-import org.apache.predictionio.data.storage.EngineManifests
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.json4s._
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
-
-class ESEngineManifests(client: Client, config: StorageClientConfig, index: 
String)
-  extends EngineManifests with Logging {
-  implicit val formats = DefaultFormats + new EngineManifestSerializer
-  private val estype = "engine_manifests"
-  private def esid(id: String, version: String) = s"$id $version"
-
-  def insert(engineManifest: EngineManifest): Unit = {
-    val json = write(engineManifest)
-    val response = client.prepareIndex(
-      index,
-      estype,
-      esid(engineManifest.id, engineManifest.version)).
-      setSource(json).execute().actionGet()
-  }
-
-  def get(id: String, version: String): Option[EngineManifest] = {
-    try {
-      val response = client.prepareGet(index, estype, esid(id, version)).
-        execute().actionGet()
-      if (response.isExists) {
-        Some(read[EngineManifest](response.getSourceAsString))
-      } else {
-        None
-      }
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        None
-    }
-  }
-
-  def getAll(): Seq[EngineManifest] = {
-    try {
-      val builder = client.prepareSearch()
-      ESUtils.getAll[EngineManifest](client, builder)
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq()
-    }
-  }
-
-  def update(engineManifest: EngineManifest, upsert: Boolean = false): Unit =
-    insert(engineManifest)
-
-  def delete(id: String, version: String): Unit = {
-    try {
-      client.prepareDelete(index, estype, esid(id, 
version)).execute().actionGet()
-    } catch {
-      case e: ElasticsearchException => error(e.getMessage)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEvaluationInstances.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEvaluationInstances.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEvaluationInstances.scala
deleted file mode 100644
index b8d7056..0000000
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESEvaluationInstances.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.elasticsearch1
-
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.EvaluationInstance
-import org.apache.predictionio.data.storage.EvaluationInstanceSerializer
-import org.apache.predictionio.data.storage.EvaluationInstances
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.elasticsearch.index.query.FilterBuilders._
-import org.elasticsearch.search.sort.SortOrder
-import org.json4s.JsonDSL._
-import org.json4s._
-import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.read
-import org.json4s.native.Serialization.write
-
-class ESEvaluationInstances(client: Client, config: StorageClientConfig, 
index: String)
-  extends EvaluationInstances with Logging {
-  implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
-  private val estype = "evaluation_instances"
-
-  val indices = client.admin.indices
-  val indexExistResponse = indices.prepareExists(index).get
-  if (!indexExistResponse.isExists) {
-    indices.prepareCreate(index).get
-  }
-  val typeExistResponse = 
indices.prepareTypesExists(index).setTypes(estype).get
-  if (!typeExistResponse.isExists) {
-    val json =
-      (estype ->
-        ("properties" ->
-          ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("startTime" -> ("type" -> "date")) ~
-          ("endTime" -> ("type" -> "date")) ~
-          ("evaluationClass" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("engineParamsGeneratorClass" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("batch" ->
-            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
-          ("evaluatorResults" ->
-            ("type" -> "string") ~ ("index" -> "no")) ~
-          ("evaluatorResultsHTML" ->
-            ("type" -> "string") ~ ("index" -> "no")) ~
-          ("evaluatorResultsJSON" ->
-            ("type" -> "string") ~ ("index" -> "no"))))
-    indices.preparePutMapping(index).setType(estype).
-      setSource(compact(render(json))).get
-  }
-
-  def insert(i: EvaluationInstance): String = {
-    try {
-      val response = client.prepareIndex(index, estype).
-        setSource(write(i)).get
-      response.getId
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        ""
-    }
-  }
-
-  def get(id: String): Option[EvaluationInstance] = {
-    try {
-      val response = client.prepareGet(index, estype, id).get
-      if (response.isExists) {
-        Some(read[EvaluationInstance](response.getSourceAsString))
-      } else {
-        None
-      }
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        None
-    }
-  }
-
-  def getAll(): Seq[EvaluationInstance] = {
-    try {
-      val builder = client.prepareSearch(index).setTypes(estype)
-      ESUtils.getAll[EvaluationInstance](client, builder)
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq()
-    }
-  }
-
-  def getCompleted(): Seq[EvaluationInstance] = {
-    try {
-      val builder = client.prepareSearch(index).setTypes(estype).setPostFilter(
-        termFilter("status", "EVALCOMPLETED")).
-        addSort("startTime", SortOrder.DESC)
-      ESUtils.getAll[EvaluationInstance](client, builder)
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        Seq()
-    }
-  }
-
-  def update(i: EvaluationInstance): Unit = {
-    try {
-      client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get
-    } catch {
-      case e: ElasticsearchException => error(e.getMessage)
-    }
-  }
-
-  def delete(id: String): Unit = {
-    try {
-      client.prepareDelete(index, estype, id).get
-    } catch {
-      case e: ElasticsearchException => error(e.getMessage)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESSequences.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESSequences.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESSequences.scala
deleted file mode 100644
index 80247ec..0000000
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESSequences.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.elasticsearch1
-
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.ElasticsearchException
-import org.elasticsearch.client.Client
-import org.json4s.JsonDSL._
-import org.json4s._
-import org.json4s.native.JsonMethods._
-
-class ESSequences(client: Client, config: StorageClientConfig, index: String) 
extends Logging {
-  implicit val formats = DefaultFormats
-  private val estype = "sequences"
-
-  val indices = client.admin.indices
-  val indexExistResponse = indices.prepareExists(index).get
-  if (!indexExistResponse.isExists) {
-    // val settingsJson =
-    //   ("number_of_shards" -> 1) ~
-    //   ("auto_expand_replicas" -> "0-all")
-    indices.prepareCreate(index).get
-  }
-  val typeExistResponse = 
indices.prepareTypesExists(index).setTypes(estype).get
-  if (!typeExistResponse.isExists) {
-    val mappingJson =
-      (estype ->
-        ("_source" -> ("enabled" -> 0)) ~
-        ("_all" -> ("enabled" -> 0)) ~
-        ("_type" -> ("index" -> "no")) ~
-        ("enabled" -> 0))
-    indices.preparePutMapping(index).setType(estype).
-      setSource(compact(render(mappingJson))).get
-  }
-
-  def genNext(name: String): Int = {
-    try {
-      val response = client.prepareIndex(index, estype, name).
-        setSource(compact(render("n" -> name))).get
-      response.getVersion().toInt
-    } catch {
-      case e: ElasticsearchException =>
-        error(e.getMessage)
-        0
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESUtils.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESUtils.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESUtils.scala
deleted file mode 100644
index 5de2999..0000000
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/ESUtils.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.elasticsearch1
-
-import org.elasticsearch.action.search.SearchRequestBuilder
-import org.elasticsearch.client.Client
-import org.elasticsearch.common.unit.TimeValue
-import org.json4s.Formats
-import org.json4s.native.Serialization.read
-
-import scala.collection.mutable.ArrayBuffer
-
-object ESUtils {
-  val scrollLife = new TimeValue(60000)
-
-  def getAll[T : Manifest](
-      client: Client,
-      builder: SearchRequestBuilder)(
-      implicit formats: Formats): Seq[T] = {
-    val results = ArrayBuffer[T]()
-    var response = builder.setScroll(scrollLife).get
-    var hits = response.getHits().hits()
-    results ++= hits.map(h => read[T](h.getSourceAsString))
-    while (hits.size > 0) {
-      response = client.prepareSearchScroll(response.getScrollId).
-        setScroll(scrollLife).get
-      hits = response.getHits().hits()
-      results ++= hits.map(h => read[T](h.getSourceAsString))
-    }
-    results
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/StorageClient.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/StorageClient.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/StorageClient.scala
deleted file mode 100644
index 6f6b1c9..0000000
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/StorageClient.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.elasticsearch1
-
-import grizzled.slf4j.Logging
-import org.apache.predictionio.data.storage.BaseStorageClient
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.apache.predictionio.data.storage.StorageClientException
-import org.elasticsearch.client.transport.TransportClient
-import org.elasticsearch.common.settings.ImmutableSettings
-import org.elasticsearch.common.transport.InetSocketTransportAddress
-import org.elasticsearch.transport.ConnectTransportException
-
-class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
-    with Logging {
-  override val prefix = "ES"
-  val client = try {
-    val hosts = config.properties.get("HOSTS").
-      map(_.split(",").toSeq).getOrElse(Seq("localhost"))
-    val ports = config.properties.get("PORTS").
-      map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9300))
-    val settings = ImmutableSettings.settingsBuilder()
-      .put("cluster.name", config.properties.getOrElse("CLUSTERNAME", 
"elasticsearch"))
-    val transportClient = new TransportClient(settings)
-    (hosts zip ports) foreach { hp =>
-      transportClient.addTransportAddress(
-        new InetSocketTransportAddress(hp._1, hp._2))
-    }
-    transportClient
-  } catch {
-    case e: ConnectTransportException =>
-      throw new StorageClientException(e.getMessage, e)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/package.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/package.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/package.scala
deleted file mode 100644
index d6aa24a..0000000
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch1/package.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage
-
-/** Elasticsearch implementation of storage traits, supporting meta data only
-  *
-  * @group Implementation
-  */
-package object elasticsearch1 {}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESAccessKeys.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESAccessKeys.scala
new file mode 100644
index 0000000..3cdfad9
--- /dev/null
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESAccessKeys.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch5
+
+import java.io.IOException
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.predictionio.data.storage.AccessKey
+import org.apache.predictionio.data.storage.AccessKeys
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.client.RestClient
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.write
+
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
+/** Elasticsearch implementation of AccessKeys. */
+class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: 
String)
+    extends AccessKeys with Logging {
+  implicit val formats = DefaultFormats.lossless
+  private val estype = "accesskeys"
+
+  val restClient = client.open()
+  try {
+    ESUtils.createIndex(restClient, index)
+    val mappingJson =
+      (estype ->
+        ("_all" -> ("enabled" -> 0)) ~
+        ("properties" ->
+          ("key" -> ("type" -> "keyword")) ~
+          ("events" -> ("type" -> "keyword"))))
+    ESUtils.createMapping(restClient, index, estype, 
compact(render(mappingJson)))
+  } finally {
+    restClient.close()
+  }
+
+  def insert(accessKey: AccessKey): Option[String] = {
+    val key = if (accessKey.key.isEmpty) generateKey else accessKey.key
+    update(accessKey.copy(key = key))
+    Some(key)
+  }
+
+  def get(id: String): Option[AccessKey] = {
+    val restClient = client.open()
+    try {
+      val response = restClient.performRequest(
+        "GET",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      (jsonResponse \ "found").extract[Boolean] match {
+        case true =>
+          Some((jsonResponse \ "_source").extract[AccessKey])
+        case _ =>
+          None
+      }
+    } catch {
+      case e: ResponseException =>
+        e.getResponse.getStatusLine.getStatusCode match {
+          case 404 => None
+          case _ =>
+            error(s"Failed to access to /$index/$estype/$id", e)
+            None
+        }
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/$key", e)
+        None
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def getAll(): Seq[AccessKey] = {
+    val restClient = client.open()
+    try {
+      val json =
+        ("query" ->
+          ("match_all" -> List.empty))
+      ESUtils.getAll[AccessKey](restClient, index, estype, 
compact(render(json)))
+    } catch {
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/_search", e)
+        Nil
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def getByAppid(appid: Int): Seq[AccessKey] = {
+    val restClient = client.open()
+    try {
+      val json =
+        ("query" ->
+          ("term" ->
+            ("appid" -> appid)))
+      ESUtils.getAll[AccessKey](restClient, index, estype, 
compact(render(json)))
+    } catch {
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/_search", e)
+        Nil
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def update(accessKey: AccessKey): Unit = {
+    val id = accessKey.key
+    val restClient = client.open()
+    try {
+      val entity = new NStringEntity(write(accessKey), 
ContentType.APPLICATION_JSON)
+      val response = restClient.performRequest(
+        "POST",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava,
+        entity)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      val result = (jsonResponse \ "result").extract[String]
+      result match {
+        case "created" =>
+        case "updated" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def delete(id: String): Unit = {
+    val restClient = client.open()
+    try {
+      val response = restClient.performRequest(
+        "DELETE",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val json = parse(EntityUtils.toString(response.getEntity))
+      val result = (json \ "result").extract[String]
+      result match {
+        case "deleted" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/id")
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/id", e)
+    } finally {
+      restClient.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESApps.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESApps.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESApps.scala
new file mode 100644
index 0000000..99fa2f0
--- /dev/null
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESApps.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch5
+
+import java.io.IOException
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.predictionio.data.storage.App
+import org.apache.predictionio.data.storage.Apps
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.client.RestClient
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.write
+
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
+/** Elasticsearch implementation of Items. */
+class ESApps(client: ESClient, config: StorageClientConfig, index: String)
+    extends Apps with Logging {
+  implicit val formats = DefaultFormats.lossless
+  private val estype = "apps"
+  private val seq = new ESSequences(client, config, index)
+
+  val restClient = client.open()
+  try {
+    ESUtils.createIndex(restClient, index)
+    val mappingJson =
+      (estype ->
+        ("_all" -> ("enabled" -> 0)) ~
+        ("properties" ->
+          ("id" -> ("type" -> "keyword")) ~
+          ("name" -> ("type" -> "keyword"))))
+    ESUtils.createMapping(restClient, index, estype, 
compact(render(mappingJson)))
+  } finally {
+    restClient.close()
+  }
+
+  def insert(app: App): Option[Int] = {
+    val id =
+      if (app.id == 0) {
+        var roll = seq.genNext(estype)
+        while (!get(roll).isEmpty) roll = seq.genNext(estype)
+        roll
+      } else app.id
+    update(app.copy(id = id))
+    Some(id)
+  }
+
+  def get(id: Int): Option[App] = {
+    val restClient = client.open()
+    try {
+      val response = restClient.performRequest(
+        "GET",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      (jsonResponse \ "found").extract[Boolean] match {
+        case true =>
+          Some((jsonResponse \ "_source").extract[App])
+        case _ =>
+          None
+      }
+    } catch {
+      case e: ResponseException =>
+        e.getResponse.getStatusLine.getStatusCode match {
+          case 404 => None
+          case _ =>
+            error(s"Failed to access to /$index/$estype/$id", e)
+            None
+        }
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/$id", e)
+        None
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def getByName(name: String): Option[App] = {
+    val restClient = client.open()
+    try {
+      val json =
+        ("query" ->
+          ("term" ->
+            ("name" -> name)))
+      val entity = new NStringEntity(compact(render(json)), 
ContentType.APPLICATION_JSON)
+      val response = restClient.performRequest(
+        "POST",
+        s"/$index/$estype/_search",
+        Map.empty[String, String].asJava,
+        entity)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      (jsonResponse \ "hits" \ "total").extract[Long] match {
+        case 0 => None
+        case _ =>
+          val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]]
+          val result = (results.head \ "_source").extract[App]
+          Some(result)
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/_search", e)
+        None
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def getAll(): Seq[App] = {
+    val restClient = client.open()
+    try {
+      val json =
+        ("query" ->
+          ("match_all" -> List.empty))
+      ESUtils.getAll[App](restClient, index, estype, compact(render(json)))
+    } catch {
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/_search", e)
+        Nil
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def update(app: App): Unit = {
+    val id = app.id.toString
+    val restClient = client.open()
+    try {
+      val entity = new NStringEntity(write(app), ContentType.APPLICATION_JSON);
+      val response = restClient.performRequest(
+        "POST",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava,
+        entity)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      val result = (jsonResponse \ "result").extract[String]
+      result match {
+        case "created" =>
+        case "updated" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def delete(id: Int): Unit = {
+    val restClient = client.open()
+    try {
+      val response = restClient.performRequest(
+        "DELETE",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val json = parse(EntityUtils.toString(response.getEntity))
+      val result = (json \ "result").extract[String]
+      result match {
+        case "deleted" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/id", e)
+    } finally {
+      restClient.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESChannels.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESChannels.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESChannels.scala
new file mode 100644
index 0000000..b8ae5b4
--- /dev/null
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESChannels.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch5
+
+import java.io.IOException
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.predictionio.data.storage.Channel
+import org.apache.predictionio.data.storage.Channels
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.client.RestClient
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.write
+
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
+class ESChannels(client: ESClient, config: StorageClientConfig, index: String)
+    extends Channels with Logging {
+  implicit val formats = DefaultFormats.lossless
+  private val estype = "channels"
+  private val seq = new ESSequences(client, config, index)
+
+  val restClient = client.open()
+  try {
+    ESUtils.createIndex(restClient, index)
+    val mappingJson =
+      (estype ->
+        ("_all" -> ("enabled" -> 0)) ~
+        ("properties" ->
+          ("name" -> ("type" -> "keyword"))))
+    ESUtils.createMapping(restClient, index, estype, 
compact(render(mappingJson)))
+  } finally {
+    restClient.close()
+  }
+
+  def insert(channel: Channel): Option[Int] = {
+    val id =
+      if (channel.id == 0) {
+        var roll = seq.genNext(estype)
+        while (!get(roll).isEmpty) roll = seq.genNext(estype)
+        roll
+      } else channel.id
+
+    if (update(channel.copy(id = id))) Some(id) else None
+  }
+
+  def get(id: Int): Option[Channel] = {
+    val restClient = client.open()
+    try {
+      val response = restClient.performRequest(
+        "GET",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      (jsonResponse \ "found").extract[Boolean] match {
+        case true =>
+          Some((jsonResponse \ "_source").extract[Channel])
+        case _ =>
+          None
+      }
+    } catch {
+      case e: ResponseException =>
+        e.getResponse.getStatusLine.getStatusCode match {
+          case 404 => None
+          case _ =>
+            error(s"Failed to access to /$index/$estype/$id", e)
+            None
+        }
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/$id", e)
+        None
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def getByAppid(appid: Int): Seq[Channel] = {
+    val restClient = client.open()
+    try {
+      val json =
+        ("query" ->
+          ("term" ->
+            ("appid" -> appid)))
+      ESUtils.getAll[Channel](restClient, index, estype, compact(render(json)))
+    } catch {
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/_search", e)
+        Nil
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def update(channel: Channel): Boolean = {
+    val id = channel.id.toString
+    val restClient = client.open()
+    try {
+      val entity = new NStringEntity(write(channel), 
ContentType.APPLICATION_JSON)
+      val response = restClient.performRequest(
+        "POST",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava,
+        entity)
+      val json = parse(EntityUtils.toString(response.getEntity))
+      val result = (json \ "result").extract[String]
+      result match {
+        case "created" => true
+        case "updated" => true
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+          false
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
+        false
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def delete(id: Int): Unit = {
+    val restClient = client.open()
+    try {
+      val response = restClient.performRequest(
+        "DELETE",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      val result = (jsonResponse \ "result").extract[String]
+      result match {
+        case "deleted" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEngineInstances.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEngineInstances.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEngineInstances.scala
new file mode 100644
index 0000000..d8d3c75
--- /dev/null
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEngineInstances.scala
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch5
+
+import java.io.IOException
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.predictionio.data.storage.EngineInstance
+import org.apache.predictionio.data.storage.EngineInstanceSerializer
+import org.apache.predictionio.data.storage.EngineInstances
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.client.RestClient
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.write
+
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
+class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: 
String)
+    extends EngineInstances with Logging {
+  implicit val formats = DefaultFormats + new EngineInstanceSerializer
+  private val estype = "engine_instances"
+
+  val restClient = client.open()
+  try {
+    ESUtils.createIndex(restClient, index)
+    val mappingJson =
+      (estype ->
+        ("_all" -> ("enabled" -> 0)) ~
+        ("properties" ->
+          ("status" -> ("type" -> "keyword")) ~
+          ("startTime" -> ("type" -> "date")) ~
+          ("endTime" -> ("type" -> "date")) ~
+          ("engineId" -> ("type" -> "keyword")) ~
+          ("engineVersion" -> ("type" -> "keyword")) ~
+          ("engineVariant" -> ("type" -> "keyword")) ~
+          ("engineFactory" -> ("type" -> "keyword")) ~
+          ("batch" -> ("type" -> "keyword")) ~
+          ("dataSourceParams" -> ("type" -> "keyword")) ~
+          ("preparatorParams" -> ("type" -> "keyword")) ~
+          ("algorithmsParams" -> ("type" -> "keyword")) ~
+          ("servingParams" -> ("type" -> "keyword")) ~
+          ("status" -> ("type" -> "keyword"))))
+    ESUtils.createMapping(restClient, index, estype, 
compact(render(mappingJson)))
+  } finally {
+    restClient.close()
+  }
+
+  def insert(i: EngineInstance): String = {
+    val id = i.id match {
+      case x if x.isEmpty =>
+        @scala.annotation.tailrec
+        def generateId(newId: Option[String]): String = {
+          newId match {
+            case Some(x) => x
+            case _ => generateId(preInsert())
+          }
+        }
+        generateId(preInsert())
+      case x => x
+    }
+
+    update(i.copy(id = id))
+    id
+  }
+
+  def preInsert(): Option[String] = {
+    val restClient = client.open()
+    try {
+      val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON)
+      val response = restClient.performRequest(
+        "POST",
+        s"/$index/$estype/",
+        Map.empty[String, String].asJava,
+        entity)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      val result = (jsonResponse \ "result").extract[String]
+      result match {
+        case "created" =>
+          Some((jsonResponse \ "_id").extract[String])
+        case _ =>
+          error(s"[$result] Failed to create $index/$estype")
+          None
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to create $index/$estype", e)
+        None
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def get(id: String): Option[EngineInstance] = {
+    val restClient = client.open()
+    try {
+      val response = restClient.performRequest(
+        "GET",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      (jsonResponse \ "found").extract[Boolean] match {
+        case true =>
+          Some((jsonResponse \ "_source").extract[EngineInstance])
+        case _ =>
+          None
+      }
+    } catch {
+      case e: ResponseException =>
+        e.getResponse.getStatusLine.getStatusCode match {
+          case 404 => None
+          case _ =>
+            error(s"Failed to access to /$index/$estype/$id", e)
+            None
+        }
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/$id", e)
+        None
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def getAll(): Seq[EngineInstance] = {
+    val restClient = client.open()
+    try {
+      val json =
+        ("query" ->
+          ("match_all" -> List.empty))
+      ESUtils.getAll[EngineInstance](restClient, index, estype, 
compact(render(json)))
+    } catch {
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/_search", e)
+        Nil
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def getCompleted(
+    engineId: String,
+    engineVersion: String,
+    engineVariant: String): Seq[EngineInstance] = {
+    val restClient = client.open()
+    try {
+      val json =
+        ("query" ->
+          ("bool" ->
+            ("must" -> List(
+              ("term" ->
+                ("status" -> "COMPLETED")),
+              ("term" ->
+                ("engineId" -> engineId)),
+              ("term" ->
+                ("engineVersion" -> engineVersion)),
+              ("term" ->
+                ("engineVariant" -> engineVariant)))))) ~
+              ("sort" -> List(
+                ("startTime" ->
+                  ("order" -> "desc"))))
+      ESUtils.getAll[EngineInstance](restClient, index, estype, 
compact(render(json)))
+    } catch {
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/_search", e)
+        Nil
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def getLatestCompleted(
+    engineId: String,
+    engineVersion: String,
+    engineVariant: String): Option[EngineInstance] =
+    getCompleted(
+      engineId,
+      engineVersion,
+      engineVariant).headOption
+
+  def update(i: EngineInstance): Unit = {
+    val id = i.id
+    val restClient = client.open()
+    try {
+      val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
+      val response = restClient.performRequest(
+        "POST",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava,
+        entity)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      val result = (jsonResponse \ "result").extract[String]
+      result match {
+        case "created" =>
+        case "updated" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def delete(id: String): Unit = {
+    val restClient = client.open()
+    try {
+      val response = restClient.performRequest(
+        "DELETE",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val json = parse(EntityUtils.toString(response.getEntity))
+      val result = (json \ "result").extract[String]
+      result match {
+        case "deleted" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEvaluationInstances.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEvaluationInstances.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEvaluationInstances.scala
new file mode 100644
index 0000000..d2f8623
--- /dev/null
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEvaluationInstances.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch5
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.predictionio.data.storage.EvaluationInstance
+import org.apache.predictionio.data.storage.EvaluationInstanceSerializer
+import org.apache.predictionio.data.storage.EvaluationInstances
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.apache.predictionio.data.storage.StorageClientException
+import org.elasticsearch.client.RestClient
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.write
+
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
+class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, 
index: String)
+    extends EvaluationInstances with Logging {
+  implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
+  private val estype = "evaluation_instances"
+  private val seq = new ESSequences(client, config, index)
+
+  val restClient = client.open()
+  try {
+    ESUtils.createIndex(restClient, index)
+    val mappingJson =
+      (estype ->
+        ("_all" -> ("enabled" -> 0)) ~
+        ("properties" ->
+          ("status" -> ("type" -> "keyword")) ~
+          ("startTime" -> ("type" -> "date")) ~
+          ("endTime" -> ("type" -> "date")) ~
+          ("evaluationClass" -> ("type" -> "keyword")) ~
+          ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~
+          ("batch" -> ("type" -> "keyword")) ~
+          ("evaluatorResults" -> ("type" -> "text") ~ ("index" -> "no")) ~
+          ("evaluatorResultsHTML" -> ("type" -> "text") ~ ("index" -> "no")) ~
+          ("evaluatorResultsJSON" -> ("type" -> "text") ~ ("index" -> "no"))))
+    ESUtils.createMapping(restClient, index, estype, 
compact(render(mappingJson)))
+  } finally {
+    restClient.close()
+  }
+
+  def insert(i: EvaluationInstance): String = {
+    val id = i.id match {
+      case x if x.isEmpty =>
+        var roll = seq.genNext(estype).toString
+        while (!get(roll).isEmpty) roll = seq.genNext(estype).toString
+        roll
+      case x => x
+    }
+
+    update(i.copy(id = id))
+    id
+  }
+
+  def get(id: String): Option[EvaluationInstance] = {
+    val restClient = client.open()
+    try {
+      val response = restClient.performRequest(
+        "GET",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+      (jsonResponse \ "found").extract[Boolean] match {
+        case true =>
+          Some((jsonResponse \ "_source").extract[EvaluationInstance])
+        case _ =>
+          None
+      }
+    } catch {
+      case e: ResponseException =>
+        e.getResponse.getStatusLine.getStatusCode match {
+          case 404 => None
+          case _ =>
+            error(s"Failed to access to /$index/$estype/$id", e)
+            None
+        }
+      case e: IOException =>
+        error(s"Failed to access to /$index/$estype/$id", e)
+        None
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def getAll(): Seq[EvaluationInstance] = {
+    val restClient = client.open()
+    try {
+      val json =
+        ("query" ->
+          ("match_all" -> List.empty))
+      ESUtils.getAll[EvaluationInstance](restClient, index, estype, 
compact(render(json)))
+    } catch {
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/_search", e)
+        Nil
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def getCompleted(): Seq[EvaluationInstance] = {
+    val restClient = client.open()
+    try {
+      val json =
+        ("query" ->
+          ("term" ->
+            ("status" -> "EVALCOMPLETED"))) ~
+            ("sort" ->
+              ("startTime" ->
+                ("order" -> "desc")))
+      ESUtils.getAll[EvaluationInstance](restClient, index, estype, 
compact(render(json)))
+    } catch {
+      case e: IOException =>
+        error("Failed to access to /$index/$estype/_search", e)
+        Nil
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def update(i: EvaluationInstance): Unit = {
+    val id = i.id
+    val restClient = client.open()
+    try {
+      val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
+      val response = restClient.performRequest(
+        "POST",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava,
+        entity)
+      val json = parse(EntityUtils.toString(response.getEntity))
+      val result = (json \ "result").extract[String]
+      result match {
+        case "created" =>
+        case "updated" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
+    }
+  }
+
+  def delete(id: String): Unit = {
+    val restClient = client.open()
+    try {
+      val response = restClient.performRequest(
+        "DELETE",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava)
+      val json = parse(EntityUtils.toString(response.getEntity))
+      val result = (json \ "result").extract[String]
+      result match {
+        case "deleted" =>
+        case _ =>
+          error(s"[$result] Failed to update $index/$estype/$id")
+      }
+    } catch {
+      case e: IOException =>
+        error(s"Failed to update $index/$estype/$id", e)
+    } finally {
+      restClient.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEventsUtil.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEventsUtil.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEventsUtil.scala
new file mode 100644
index 0000000..3f70266
--- /dev/null
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESEventsUtil.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.predictionio.data.storage.elasticsearch5
+
+import org.apache.hadoop.io.DoubleWritable
+import org.apache.hadoop.io.LongWritable
+import org.apache.hadoop.io.MapWritable
+import org.apache.hadoop.io.Text
+import org.apache.predictionio.data.storage.DataMap
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.EventValidation
+import org.joda.time.DateTime
+import org.joda.time.DateTimeZone
+import org.json4s._
+
+object ESEventsUtil {
+
+  implicit val formats = DefaultFormats
+
+  def resultToEvent(id: Text, result: MapWritable, appId: Int): Event = {
+
+    def getStringCol(col: String): String = {
+      val r = result.get(new Text(col)).asInstanceOf[Text]
+      require(r != null,
+        s"Failed to get value for column ${col}. " +
+          s"StringBinary: ${r.getBytes()}.")
+
+      r.toString()
+    }
+
+    def getOptStringCol(col: String): Option[String] = {
+      val r = result.get(new Text(col))
+      if (r == null) {
+        None
+      } else {
+        Some(r.asInstanceOf[Text].toString())
+      }
+    }
+
+    val tmp = result
+      .get(new Text("properties")).asInstanceOf[MapWritable]
+      .get(new Text("fields")).asInstanceOf[MapWritable]
+      .get(new Text("rating"))
+
+    val rating =
+      if (tmp.isInstanceOf[DoubleWritable]) tmp.asInstanceOf[DoubleWritable]
+      else if (tmp.isInstanceOf[LongWritable]) {
+        new DoubleWritable(tmp.asInstanceOf[LongWritable].get().toDouble)
+      }
+      else null
+
+    val properties: DataMap =
+      if (rating != null) DataMap(s"""{"rating":${rating.get().toString}}""")
+      else DataMap()
+
+
+    val eventId = Some(getStringCol("eventId"))
+    val event = getStringCol("event")
+    val entityType = getStringCol("entityType")
+    val entityId = getStringCol("entityId")
+    val targetEntityType = getOptStringCol("targetEntityType")
+    val targetEntityId = getOptStringCol("targetEntityId")
+    val prId = getOptStringCol("prId")
+    val eventTimeZone = getOptStringCol("eventTimeZone")
+      .map(DateTimeZone.forID(_))
+      .getOrElse(EventValidation.defaultTimeZone)
+    val eventTime = new DateTime(
+      getStringCol("eventTime"), eventTimeZone)
+    val creationTimeZone = getOptStringCol("creationTimeZone")
+      .map(DateTimeZone.forID(_))
+      .getOrElse(EventValidation.defaultTimeZone)
+    val creationTime: DateTime = new DateTime(
+      getStringCol("creationTime"), creationTimeZone)
+
+
+    Event(
+      eventId = eventId,
+      event = event,
+      entityType = entityType,
+      entityId = entityId,
+      targetEntityType = targetEntityType,
+      targetEntityId = targetEntityId,
+      properties = properties,
+      eventTime = eventTime,
+      tags = Seq(),
+      prId = prId,
+      creationTime = creationTime
+    )
+  }
+
+  def eventToPut(event: Event, appId: Int): Seq[Map[String, Any]] = {
+    Seq(
+      Map(
+        "eventId" -> event.eventId,
+        "event" -> event.event,
+        "entityType" -> event.entityType,
+        "entityId" -> event.entityId,
+        "targetEntityType" -> event.targetEntityType,
+        "targetEntityId" -> event.targetEntityId,
+        "properties" -> event.properties,
+        "eventTime" -> event.eventTime,
+        "tags" -> event.tags,
+        "prId" -> event.prId,
+        "creationTime" -> event.creationTime
+      )
+    )
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESLEvents.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESLEvents.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESLEvents.scala
new file mode 100644
index 0000000..74aa4b5
--- /dev/null
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch5/ESLEvents.scala
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.predictionio.data.storage.elasticsearch5
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.predictionio.data.storage.Event
+import org.apache.predictionio.data.storage.LEvents
+import org.apache.predictionio.data.storage.StorageClientConfig
+import org.elasticsearch.client.RestClient
+import org.joda.time.DateTime
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.write
+import org.json4s.ext.JodaTimeSerializers
+
+import grizzled.slf4j.Logging
+import org.elasticsearch.client.ResponseException
+
+class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: 
String)
+    extends LEvents with Logging {
+  implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
+  private val seq = new ESSequences(client, config, index)
+  private val seqName = "events"
+
+  def getEsType(appId: Int, channelId: Option[Int] = None): String = {
+    channelId.map { ch =>
+      s"${appId}_${ch}"
+    }.getOrElse {
+      s"${appId}"
+    }
+  }
+
+  override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
+    val estype = getEsType(appId, channelId)
+    val restClient = client.open()
+    try {
+      ESUtils.createIndex(restClient, index)
+      val json =
+        (estype ->
+          ("_all" -> ("enabled" -> 0)) ~
+          ("properties" ->
+            ("name" -> ("type" -> "keyword")) ~
+            ("eventId" -> ("type" -> "keyword")) ~
+            ("event" -> ("type" -> "keyword")) ~
+            ("entityType" -> ("type" -> "keyword")) ~
+            ("entityId" -> ("type" -> "keyword")) ~
+            ("targetEntityType" -> ("type" -> "keyword")) ~
+            ("targetEntityId" -> ("type" -> "keyword")) ~
+            ("properties" ->
+              ("type" -> "nested") ~
+              ("properties" ->
+                ("fields" -> ("type" -> "nested") ~
+                  ("properties" ->
+                    ("user" -> ("type" -> "long")) ~
+                    ("num" -> ("type" -> "long")))))) ~
+                    ("eventTime" -> ("type" -> "date")) ~
+                    ("tags" -> ("type" -> "keyword")) ~
+                    ("prId" -> ("type" -> "keyword")) ~
+                    ("creationTime" -> ("type" -> "date"))))
+      ESUtils.createMapping(restClient, index, estype, compact(render(json)))
+    } finally {
+      restClient.close()
+    }
+    true
+  }
+
+  override def remove(appId: Int, channelId: Option[Int] = None): Boolean = {
+    val estype = getEsType(appId, channelId)
+    val restClient = client.open()
+    try {
+      val json =
+        ("query" ->
+          ("match_all" -> List.empty))
+      val entity = new NStringEntity(compact(render(json)), 
ContentType.APPLICATION_JSON)
+      restClient.performRequest(
+        "POST",
+        s"/$index/$estype/_delete_by_query",
+        Map.empty[String, String].asJava,
+        entity).getStatusLine.getStatusCode match {
+          case 200 => true
+          case _ =>
+            error(s"Failed to remove $index/$estype")
+            false
+        }
+    } catch {
+      case e: Exception =>
+        error(s"Failed to remove $index/$estype", e)
+        false
+    } finally {
+      restClient.close()
+    }
+  }
+
+  override def close(): Unit = {
+    // nothing
+  }
+
+  override def futureInsert(
+    event: Event,
+    appId: Int,
+    channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = {
+    Future {
+      val estype = getEsType(appId, channelId)
+      val restClient = client.open()
+      try {
+        val id = event.eventId.getOrElse {
+          var roll = seq.genNext(seqName)
+          while (exists(restClient, estype, roll)) roll = seq.genNext(seqName)
+          roll.toString
+        }
+        val json = write(event.copy(eventId = Some(id)))
+        val entity = new NStringEntity(json, ContentType.APPLICATION_JSON);
+        val response = restClient.performRequest(
+          "POST",
+          s"/$index/$estype/$id",
+          Map.empty[String, String].asJava,
+          entity)
+        val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+        val result = (jsonResponse \ "result").extract[String]
+        result match {
+          case "created" => id
+          case "updated" => id
+          case _ =>
+            error(s"[$result] Failed to update $index/$estype/$id")
+            ""
+        }
+      } catch {
+        case e: IOException =>
+          error(s"Failed to update $index/$estype/<id>", e)
+          ""
+      } finally {
+        restClient.close()
+      }
+    }
+  }
+
+  private def exists(restClient: RestClient, estype: String, id: Int): Boolean 
= {
+    try {
+      restClient.performRequest(
+        "GET",
+        s"/$index/$estype/$id",
+        Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
+          case 200 => true
+          case _ => false
+        }
+    } catch {
+      case e: ResponseException =>
+        e.getResponse.getStatusLine.getStatusCode match {
+          case 404 => false
+          case _ =>
+            error(s"Failed to access to /$index/$estype/$id", e)
+            false
+        }
+      case e: IOException =>
+        error(s"Failed to access to $index/$estype/$id", e)
+        false
+    }
+  }
+
+  override def futureGet(
+    eventId: String,
+    appId: Int,
+    channelId: Option[Int])(implicit ec: ExecutionContext): 
Future[Option[Event]] = {
+    Future {
+      val estype = getEsType(appId, channelId)
+      val restClient = client.open()
+      try {
+        val json =
+          ("query" ->
+            ("term" ->
+              ("eventId" -> eventId)))
+        val entity = new NStringEntity(compact(render(json)), 
ContentType.APPLICATION_JSON)
+        val response = restClient.performRequest(
+          "POST",
+          s"/$index/$estype/_search",
+          Map.empty[String, String].asJava,
+          entity)
+        val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+        (jsonResponse \ "hits" \ "total").extract[Long] match {
+          case 0 => None
+          case _ =>
+            val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]]
+            val result = (results.head \ "_source").extract[Event]
+            Some(result)
+        }
+      } catch {
+        case e: IOException =>
+          error("Failed to access to /$index/$estype/_search", e)
+          None
+      } finally {
+        restClient.close()
+      }
+    }
+  }
+
+  override def futureDelete(
+    eventId: String,
+    appId: Int,
+    channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = {
+    Future {
+      val estype = getEsType(appId, channelId)
+      val restClient = client.open()
+      try {
+        val json =
+          ("query" ->
+            ("term" ->
+              ("eventId" -> eventId)))
+        val entity = new NStringEntity(compact(render(json)), 
ContentType.APPLICATION_JSON)
+        val response = restClient.performRequest(
+          "POST",
+          s"/$index/$estype/_delete_by_query",
+          Map.empty[String, String].asJava)
+        val jsonResponse = parse(EntityUtils.toString(response.getEntity))
+        val result = (jsonResponse \ "result").extract[String]
+        result match {
+          case "deleted" => true
+          case _ =>
+            error(s"[$result] Failed to update $index/$estype:$eventId")
+            false
+        }
+      } catch {
+        case e: IOException =>
+          error(s"Failed to update $index/$estype:$eventId", e)
+          false
+      } finally {
+        restClient.close()
+      }
+    }
+  }
+
+  override def futureFind(
+    appId: Int,
+    channelId: Option[Int] = None,
+    startTime: Option[DateTime] = None,
+    untilTime: Option[DateTime] = None,
+    entityType: Option[String] = None,
+    entityId: Option[String] = None,
+    eventNames: Option[Seq[String]] = None,
+    targetEntityType: Option[Option[String]] = None,
+    targetEntityId: Option[Option[String]] = None,
+    limit: Option[Int] = None,
+    reversed: Option[Boolean] = None)
+    (implicit ec: ExecutionContext): Future[Iterator[Event]] = {
+    Future {
+      val estype = getEsType(appId, channelId)
+      val restClient = client.open()
+      try {
+        val query = ESUtils.createEventQuery(
+          startTime, untilTime, entityType, entityId,
+          eventNames, targetEntityType, targetEntityId, None)
+        ESUtils.getAll[Event](restClient, index, estype, query).toIterator
+      } catch {
+        case e: IOException =>
+          error(e.getMessage)
+          Iterator[Event]()
+      } finally {
+        restClient.close()
+      }
+    }
+  }
+
+}


Reply via email to