[PIO-49] Rename Elasticsearch packages

Rename elasticsearch1 back to elasticsearch to main backward
compatibility with existing configuration files

New ES5+ support now lives under elasticsearch5 package

Includes a minor fix to the "pio status" output not showing Spark's
proper location


Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/c64941b6
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/c64941b6
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/c64941b6

Branch: refs/heads/feature/es5
Commit: c64941b6e11666ea160f4d19bd4a2302f988d9dd
Parents: d039dda
Author: Donald Szeto <[email protected]>
Authored: Sat Feb 11 13:57:44 2017 -0800
Committer: Donald Szeto <[email protected]>
Committed: Sat Feb 11 13:57:44 2017 -0800

----------------------------------------------------------------------
 .travis.yml                                     |   4 +-
 build.sbt                                       |   4 +-
 conf/pio-env.sh.template                        |  18 +-
 core/build.sbt                                  |   4 +-
 data/build.sbt                                  |   8 +-
 .../storage/elasticsearch/ESAccessKeys.scala    | 154 ++++------
 .../data/storage/elasticsearch/ESApps.scala     | 176 ++++--------
 .../data/storage/elasticsearch/ESChannels.scala | 144 ++++------
 .../elasticsearch/ESEngineInstances.scala       | 246 +++++-----------
 .../elasticsearch/ESEvaluationInstances.scala   | 186 +++++-------
 .../storage/elasticsearch/ESEventsUtil.scala    | 125 --------
 .../data/storage/elasticsearch/ESLEvents.scala  | 286 -------------------
 .../data/storage/elasticsearch/ESPEvents.scala  | 145 ----------
 .../storage/elasticsearch/ESSequences.scala     |  71 ++---
 .../data/storage/elasticsearch/ESUtils.scala    | 157 ++--------
 .../storage/elasticsearch/StorageClient.scala   |  40 +--
 .../storage/elasticsearch1/ESAccessKeys.scala   | 119 --------
 .../data/storage/elasticsearch1/ESApps.scala    | 130 ---------
 .../storage/elasticsearch1/ESChannels.scala     | 117 --------
 .../elasticsearch1/ESEngineInstances.scala      | 158 ----------
 .../elasticsearch1/ESEngineManifests.scala      |  84 ------
 .../elasticsearch1/ESEvaluationInstances.scala  | 136 ---------
 .../storage/elasticsearch1/ESSequences.scala    |  64 -----
 .../data/storage/elasticsearch1/ESUtils.scala   |  48 ----
 .../storage/elasticsearch1/StorageClient.scala  |  50 ----
 .../data/storage/elasticsearch1/package.scala   |  25 --
 .../storage/elasticsearch5/ESAccessKeys.scala   | 175 ++++++++++++
 .../data/storage/elasticsearch5/ESApps.scala    | 194 +++++++++++++
 .../storage/elasticsearch5/ESChannels.scala     | 165 +++++++++++
 .../elasticsearch5/ESEngineInstances.scala      | 248 ++++++++++++++++
 .../elasticsearch5/ESEvaluationInstances.scala  | 194 +++++++++++++
 .../storage/elasticsearch5/ESEventsUtil.scala   | 125 ++++++++
 .../data/storage/elasticsearch5/ESLEvents.scala | 286 +++++++++++++++++++
 .../data/storage/elasticsearch5/ESPEvents.scala | 145 ++++++++++
 .../storage/elasticsearch5/ESSequences.scala    |  79 +++++
 .../data/storage/elasticsearch5/ESUtils.scala   | 163 +++++++++++
 .../storage/elasticsearch5/StorageClient.scala  |  44 +++
 .../data/storage/elasticsearch5/package.scala   |  25 ++
 project/Build.scala                             |   6 +-
 .../tools/commands/Management.scala             |   2 +-
 40 files changed, 2233 insertions(+), 2317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 4d62999..68dee42 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -63,8 +63,8 @@ env:
   matrix:
     - BUILD_TYPE=Unit
     - BUILD_TYPE=Integration METADATA_REP=PGSQL EVENTDATA_REP=PGSQL 
MODELDATA_REP=PGSQL
-    - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH1 EVENTDATA_REP=HBASE 
MODELDATA_REP=LOCALFS
-    - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH1 EVENTDATA_REP=PGSQL 
MODELDATA_REP=HDFS
+    - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=HBASE 
MODELDATA_REP=LOCALFS
+    - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=PGSQL 
MODELDATA_REP=HDFS
 
 before_install:
   - unset SBT_OPTS JVM_OPTS

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/build.sbt
----------------------------------------------------------------------
diff --git a/build.sbt b/build.sbt
index ce626ce..1e9eb8a 100644
--- a/build.sbt
+++ b/build.sbt
@@ -34,9 +34,9 @@ fork in (ThisBuild, run) := true
 javacOptions in (ThisBuild, compile) ++= Seq("-source", "1.7", "-target", 
"1.7",
   "-Xlint:deprecation", "-Xlint:unchecked")
 
-elasticsearchVersion in ThisBuild := "5.1.2"
+elasticsearch5Version in ThisBuild := "5.1.2"
 
-elasticsearch1Version in ThisBuild := "1.7.6"
+elasticsearchVersion in ThisBuild := "1.7.6"
 
 json4sVersion in ThisBuild := "3.2.10"
 

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/conf/pio-env.sh.template
----------------------------------------------------------------------
diff --git a/conf/pio-env.sh.template b/conf/pio-env.sh.template
index 8f5d7b1..f56f137 100644
--- a/conf/pio-env.sh.template
+++ b/conf/pio-env.sh.template
@@ -84,17 +84,17 @@ PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio
 # PIO_STORAGE_SOURCES_MYSQL_PASSWORD=pio
 
 # Elasticsearch Example
+# PIO_STORAGE_SOURCES_ELASTICSEARCH5_TYPE=elasticsearch5
+# PIO_STORAGE_SOURCES_ELASTICSEARCH5_HOSTS=localhost
+# PIO_STORAGE_SOURCES_ELASTICSEARCH5_PORTS=9200
+# PIO_STORAGE_SOURCES_ELASTICSEARCH5_SCHEMES=http
+# PIO_STORAGE_SOURCES_ELASTICSEARCH5_HOME=$PIO_HOME/vendors/elasticsearch-5.1.2
+# Elasticsearch 1.x Example
 # PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_CLUSTERNAME=<elasticsearch_cluster_name>
 # PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=localhost
-# PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200
-# PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http
-# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.1.2
-# Elasticsearch 1.x Example
-# PIO_STORAGE_SOURCES_ELASTICSEARCH1_TYPE=elasticsearch1
-# PIO_STORAGE_SOURCES_ELASTICSEARCH1_CLUSTERNAME=<elasticsearch_cluster_name>
-# PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOSTS=localhost
-# PIO_STORAGE_SOURCES_ELASTICSEARCH1_PORTS=9300
-# PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOME=$PIO_HOME/vendors/elasticsearch-1.7.6
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9300
+# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-1.7.6
 
 # Local File System Example
 # PIO_STORAGE_SOURCES_LOCALFS_TYPE=localfs

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/core/build.sbt
----------------------------------------------------------------------
diff --git a/core/build.sbt b/core/build.sbt
index 305075e..b1f589d 100644
--- a/core/build.sbt
+++ b/core/build.sbt
@@ -32,8 +32,8 @@ libraryDependencies ++= Seq(
   "org.apache.spark"        %% "spark-core"       % sparkVersion.value % 
"provided",
   "org.apache.spark"        %% "spark-sql"        % sparkVersion.value % 
"provided",
   "org.clapper"             %% "grizzled-slf4j"   % "1.0.2",
-  "org.elasticsearch.client" % "rest"             % elasticsearchVersion.value,
-  "org.elasticsearch"        % "elasticsearch"    % 
elasticsearch1Version.value,
+  "org.elasticsearch.client" % "rest"             % 
elasticsearch5Version.value,
+  "org.elasticsearch"        % "elasticsearch"    % elasticsearchVersion.value,
   "org.json4s"              %% "json4s-native"    % json4sVersion.value,
   "org.json4s"              %% "json4s-ext"       % json4sVersion.value,
   "org.scalaj"              %% "scalaj-http"      % "1.1.6",

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/build.sbt
----------------------------------------------------------------------
diff --git a/data/build.sbt b/data/build.sbt
index 75d3c09..306153a 100644
--- a/data/build.sbt
+++ b/data/build.sbt
@@ -43,10 +43,10 @@ libraryDependencies ++= Seq(
   "org.apache.spark"        %% "spark-core"     % sparkVersion.value % 
"provided",
   "org.apache.spark"        %% "spark-sql"      % sparkVersion.value % 
"provided",
   "org.clapper"             %% "grizzled-slf4j" % "1.0.2",
-  "org.elasticsearch.client" % "rest"           % elasticsearchVersion.value,
-  "org.elasticsearch"        % "elasticsearch"  % elasticsearch1Version.value,
-  "org.elasticsearch"        % "elasticsearch-spark-13_2.10" % 
elasticsearchVersion.value % "provided",
-  "org.elasticsearch"        % "elasticsearch-hadoop-mr" % 
elasticsearchVersion.value,
+  "org.elasticsearch.client" % "rest"           % elasticsearch5Version.value,
+  "org.elasticsearch"        % "elasticsearch"  % elasticsearchVersion.value,
+  "org.elasticsearch"       %% "elasticsearch-spark-13" % 
elasticsearch5Version.value % "provided",
+  "org.elasticsearch"        % "elasticsearch-hadoop-mr" % 
elasticsearch5Version.value,
   "org.json4s"              %% "json4s-native"  % json4sVersion.value,
   "org.json4s"              %% "json4s-ext"     % json4sVersion.value,
   "org.postgresql"           % "postgresql"     % "9.4-1204-jdbc41",

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
index 9156fab..077168a 100644
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala
@@ -15,45 +15,44 @@
  * limitations under the License.
  */
 
-package org.apache.predictionio.data.storage.elasticsearch
-
-import java.io.IOException
 
-import scala.collection.JavaConverters.mapAsJavaMapConverter
+package org.apache.predictionio.data.storage.elasticsearch
 
-import org.apache.http.entity.ContentType
-import org.apache.http.nio.entity.NStringEntity
-import org.apache.http.util.EntityUtils
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.StorageClientConfig
 import org.apache.predictionio.data.storage.AccessKey
 import org.apache.predictionio.data.storage.AccessKeys
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.client.RestClient
-import org.json4s._
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
 import org.json4s.JsonDSL._
+import org.json4s._
 import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
 import org.json4s.native.Serialization.write
 
-import grizzled.slf4j.Logging
-import org.elasticsearch.client.ResponseException
+import scala.util.Random
 
 /** Elasticsearch implementation of AccessKeys. */
-class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: 
String)
+class ESAccessKeys(client: Client, config: StorageClientConfig, index: String)
     extends AccessKeys with Logging {
   implicit val formats = DefaultFormats.lossless
   private val estype = "accesskeys"
 
-  val restClient = client.open()
-  try {
-    ESUtils.createIndex(restClient, index)
-    val mappingJson =
+  val indices = client.admin.indices
+  val indexExistResponse = indices.prepareExists(index).get
+  if (!indexExistResponse.isExists) {
+    indices.prepareCreate(index).get
+  }
+  val typeExistResponse = 
indices.prepareTypesExists(index).setTypes(estype).get
+  if (!typeExistResponse.isExists) {
+    val json =
       (estype ->
-        ("_all" -> ("enabled" -> 0)) ~
         ("properties" ->
-          ("key" -> ("type" -> "keyword")) ~
-          ("events" -> ("type" -> "keyword"))))
-    ESUtils.createMapping(restClient, index, estype, 
compact(render(mappingJson)))
-  } finally {
-    restClient.close()
+          ("key" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("events" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+    indices.preparePutMapping(index).setType(estype).
+      setSource(compact(render(json))).get
   }
 
   def insert(accessKey: AccessKey): Option[String] = {
@@ -62,114 +61,59 @@ class ESAccessKeys(client: ESClient, config: 
StorageClientConfig, index: String)
     Some(key)
   }
 
-  def get(id: String): Option[AccessKey] = {
-    val restClient = client.open()
+  def get(key: String): Option[AccessKey] = {
     try {
-      val response = restClient.performRequest(
-        "GET",
-        s"/$index/$estype/$id",
-        Map.empty[String, String].asJava)
-      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
-      (jsonResponse \ "found").extract[Boolean] match {
-        case true =>
-          Some((jsonResponse \ "_source").extract[AccessKey])
-        case _ =>
-          None
-      }
+      val response = client.prepareGet(
+        index,
+        estype,
+        key).get()
+      Some(read[AccessKey](response.getSourceAsString))
     } catch {
-      case e: ResponseException =>
-        e.getResponse.getStatusLine.getStatusCode match {
-          case 404 => None
-          case _ =>
-            error(s"Failed to access to /$index/$estype/$id", e)
-            None
-        }
-      case e: IOException =>
-        error("Failed to access to /$index/$estype/$key", e)
+      case e: ElasticsearchException =>
+        error(e.getMessage)
         None
-    } finally {
-      restClient.close()
+      case e: NullPointerException => None
     }
   }
 
   def getAll(): Seq[AccessKey] = {
-    val restClient = client.open()
     try {
-      val json =
-        ("query" ->
-          ("match_all" -> List.empty))
-      ESUtils.getAll[AccessKey](restClient, index, estype, 
compact(render(json)))
+      val builder = client.prepareSearch(index).setTypes(estype)
+      ESUtils.getAll[AccessKey](client, builder)
     } catch {
-      case e: IOException =>
-        error("Failed to access to /$index/$estype/_search", e)
-        Nil
-    } finally {
-      restClient.close()
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq[AccessKey]()
     }
   }
 
   def getByAppid(appid: Int): Seq[AccessKey] = {
-    val restClient = client.open()
     try {
-      val json =
-        ("query" ->
-          ("term" ->
-            ("appid" -> appid)))
-      ESUtils.getAll[AccessKey](restClient, index, estype, 
compact(render(json)))
+      val builder = client.prepareSearch(index).setTypes(estype).
+        setPostFilter(termFilter("appid", appid))
+      ESUtils.getAll[AccessKey](client, builder)
     } catch {
-      case e: IOException =>
-        error("Failed to access to /$index/$estype/_search", e)
-        Nil
-    } finally {
-      restClient.close()
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq[AccessKey]()
     }
   }
 
   def update(accessKey: AccessKey): Unit = {
-    val id = accessKey.key
-    val restClient = client.open()
     try {
-      val entity = new NStringEntity(write(accessKey), 
ContentType.APPLICATION_JSON)
-      val response = restClient.performRequest(
-        "POST",
-        s"/$index/$estype/$id",
-        Map.empty[String, String].asJava,
-        entity)
-      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
-      val result = (jsonResponse \ "result").extract[String]
-      result match {
-        case "created" =>
-        case "updated" =>
-        case _ =>
-          error(s"[$result] Failed to update $index/$estype/$id")
-      }
+      client.prepareIndex(index, estype, 
accessKey.key).setSource(write(accessKey)).get()
     } catch {
-      case e: IOException =>
-        error(s"Failed to update $index/$estype/$id", e)
-    } finally {
-      restClient.close()
+      case e: ElasticsearchException =>
+        error(e.getMessage)
     }
   }
 
-  def delete(id: String): Unit = {
-    val restClient = client.open()
+  def delete(key: String): Unit = {
     try {
-      val response = restClient.performRequest(
-        "DELETE",
-        s"/$index/$estype/$id",
-        Map.empty[String, String].asJava)
-      val json = parse(EntityUtils.toString(response.getEntity))
-      val result = (json \ "result").extract[String]
-      result match {
-        case "deleted" =>
-        case _ =>
-          error(s"[$result] Failed to update $index/$estype/id")
-      }
+      client.prepareDelete(index, estype, key).get
     } catch {
-      case e: IOException =>
-        error(s"Failed to update $index/$estype/id", e)
-    } finally {
-      restClient.close()
+      case e: ElasticsearchException =>
+        error(e.getMessage)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
index 0379c90..3781a4b 100644
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala
@@ -15,180 +15,116 @@
  * limitations under the License.
  */
 
-package org.apache.predictionio.data.storage.elasticsearch
-
-import java.io.IOException
 
-import scala.collection.JavaConverters.mapAsJavaMapConverter
+package org.apache.predictionio.data.storage.elasticsearch
 
-import org.apache.http.entity.ContentType
-import org.apache.http.nio.entity.NStringEntity
-import org.apache.http.util.EntityUtils
+import grizzled.slf4j.Logging
+import org.apache.predictionio.data.storage.StorageClientConfig
 import org.apache.predictionio.data.storage.App
 import org.apache.predictionio.data.storage.Apps
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.client.RestClient
-import org.json4s._
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
 import org.json4s.JsonDSL._
+import org.json4s._
 import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
 import org.json4s.native.Serialization.write
 
-import grizzled.slf4j.Logging
-import org.elasticsearch.client.ResponseException
-
 /** Elasticsearch implementation of Items. */
-class ESApps(client: ESClient, config: StorageClientConfig, index: String)
-    extends Apps with Logging {
+class ESApps(client: Client, config: StorageClientConfig, index: String)
+  extends Apps with Logging {
   implicit val formats = DefaultFormats.lossless
   private val estype = "apps"
   private val seq = new ESSequences(client, config, index)
 
-  val restClient = client.open()
-  try {
-    ESUtils.createIndex(restClient, index)
-    val mappingJson =
+  val indices = client.admin.indices
+  val indexExistResponse = indices.prepareExists(index).get
+  if (!indexExistResponse.isExists) {
+    indices.prepareCreate(index).get
+  }
+  val typeExistResponse = 
indices.prepareTypesExists(index).setTypes(estype).get
+  if (!typeExistResponse.isExists) {
+    val json =
       (estype ->
-        ("_all" -> ("enabled" -> 0)) ~
         ("properties" ->
-          ("id" -> ("type" -> "keyword")) ~
-          ("name" -> ("type" -> "keyword"))))
-    ESUtils.createMapping(restClient, index, estype, 
compact(render(mappingJson)))
-  } finally {
-    restClient.close()
+          ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+    indices.preparePutMapping(index).setType(estype).
+      setSource(compact(render(json))).get
   }
 
   def insert(app: App): Option[Int] = {
     val id =
       if (app.id == 0) {
-        var roll = seq.genNext(estype)
-        while (!get(roll).isEmpty) roll = seq.genNext(estype)
+        var roll = seq.genNext("apps")
+        while (!get(roll).isEmpty) roll = seq.genNext("apps")
         roll
-      } else app.id
-    update(app.copy(id = id))
+      }
+      else app.id
+    val realapp = app.copy(id = id)
+    update(realapp)
     Some(id)
   }
 
   def get(id: Int): Option[App] = {
-    val restClient = client.open()
     try {
-      val response = restClient.performRequest(
-        "GET",
-        s"/$index/$estype/$id",
-        Map.empty[String, String].asJava)
-      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
-      (jsonResponse \ "found").extract[Boolean] match {
-        case true =>
-          Some((jsonResponse \ "_source").extract[App])
-        case _ =>
-          None
-      }
+      val response = client.prepareGet(
+        index,
+        estype,
+        id.toString).get()
+      Some(read[App](response.getSourceAsString))
     } catch {
-      case e: ResponseException =>
-        e.getResponse.getStatusLine.getStatusCode match {
-          case 404 => None
-          case _ =>
-            error(s"Failed to access to /$index/$estype/$id", e)
-            None
-        }
-      case e: IOException =>
-        error(s"Failed to access to /$index/$estype/$id", e)
+      case e: ElasticsearchException =>
+        error(e.getMessage)
         None
-    } finally {
-      restClient.close()
+      case e: NullPointerException => None
     }
   }
 
   def getByName(name: String): Option[App] = {
-    val restClient = client.open()
     try {
-      val json =
-        ("query" ->
-          ("term" ->
-            ("name" -> name)))
-      val entity = new NStringEntity(compact(render(json)), 
ContentType.APPLICATION_JSON)
-      val response = restClient.performRequest(
-        "POST",
-        s"/$index/$estype/_search",
-        Map.empty[String, String].asJava,
-        entity)
-      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
-      (jsonResponse \ "hits" \ "total").extract[Long] match {
-        case 0 => None
-        case _ =>
-          val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]]
-          val result = (results.head \ "_source").extract[App]
-          Some(result)
+      val response = client.prepareSearch(index).setTypes(estype).
+        setPostFilter(termFilter("name", name)).get
+      val hits = response.getHits().hits()
+      if (hits.size > 0) {
+        Some(read[App](hits.head.getSourceAsString))
+      } else {
+        None
       }
     } catch {
-      case e: IOException =>
-        error(s"Failed to access to /$index/$estype/_search", e)
+      case e: ElasticsearchException =>
+        error(e.getMessage)
         None
-    } finally {
-      restClient.close()
     }
   }
 
   def getAll(): Seq[App] = {
-    val restClient = client.open()
     try {
-      val json =
-        ("query" ->
-          ("match_all" -> List.empty))
-      ESUtils.getAll[App](restClient, index, estype, compact(render(json)))
+      val builder = client.prepareSearch(index).setTypes(estype)
+      ESUtils.getAll[App](client, builder)
     } catch {
-      case e: IOException =>
-        error("Failed to access to /$index/$estype/_search", e)
-        Nil
-    } finally {
-      restClient.close()
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq[App]()
     }
   }
 
   def update(app: App): Unit = {
-    val id = app.id.toString
-    val restClient = client.open()
     try {
-      val entity = new NStringEntity(write(app), ContentType.APPLICATION_JSON);
-      val response = restClient.performRequest(
-        "POST",
-        s"/$index/$estype/$id",
-        Map.empty[String, String].asJava,
-        entity)
-      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
-      val result = (jsonResponse \ "result").extract[String]
-      result match {
-        case "created" =>
-        case "updated" =>
-        case _ =>
-          error(s"[$result] Failed to update $index/$estype/$id")
-      }
+      val response = client.prepareIndex(index, estype, app.id.toString).
+        setSource(write(app)).get()
     } catch {
-      case e: IOException =>
-        error(s"Failed to update $index/$estype/$id", e)
-    } finally {
-      restClient.close()
+      case e: ElasticsearchException =>
+        error(e.getMessage)
     }
   }
 
   def delete(id: Int): Unit = {
-    val restClient = client.open()
     try {
-      val response = restClient.performRequest(
-        "DELETE",
-        s"/$index/$estype/$id",
-        Map.empty[String, String].asJava)
-      val json = parse(EntityUtils.toString(response.getEntity))
-      val result = (json \ "result").extract[String]
-      result match {
-        case "deleted" =>
-        case _ =>
-          error(s"[$result] Failed to update $index/$estype/$id")
-      }
+      client.prepareDelete(index, estype, id.toString).get
     } catch {
-      case e: IOException =>
-        error(s"Failed to update $index/$estype/id", e)
-    } finally {
-      restClient.close()
+      case e: ElasticsearchException =>
+        error(e.getMessage)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
index b319c26..52697fd 100644
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala
@@ -15,151 +15,103 @@
  * limitations under the License.
  */
 
-package org.apache.predictionio.data.storage.elasticsearch
-
-import java.io.IOException
 
-import scala.collection.JavaConverters.mapAsJavaMapConverter
+package org.apache.predictionio.data.storage.elasticsearch
 
-import org.apache.http.entity.ContentType
-import org.apache.http.nio.entity.NStringEntity
-import org.apache.http.util.EntityUtils
+import grizzled.slf4j.Logging
 import org.apache.predictionio.data.storage.Channel
 import org.apache.predictionio.data.storage.Channels
 import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.client.RestClient
-import org.json4s._
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders.termFilter
+import org.json4s.DefaultFormats
 import org.json4s.JsonDSL._
 import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
 import org.json4s.native.Serialization.write
 
-import grizzled.slf4j.Logging
-import org.elasticsearch.client.ResponseException
-
-class ESChannels(client: ESClient, config: StorageClientConfig, index: String)
+class ESChannels(client: Client, config: StorageClientConfig, index: String)
     extends Channels with Logging {
+
   implicit val formats = DefaultFormats.lossless
   private val estype = "channels"
   private val seq = new ESSequences(client, config, index)
+  private val seqName = "channels"
 
-  val restClient = client.open()
-  try {
-    ESUtils.createIndex(restClient, index)
-    val mappingJson =
+  val indices = client.admin.indices
+  val indexExistResponse = indices.prepareExists(index).get
+  if (!indexExistResponse.isExists) {
+    indices.prepareCreate(index).get
+  }
+  val typeExistResponse = 
indices.prepareTypesExists(index).setTypes(estype).get
+  if (!typeExistResponse.isExists) {
+    val json =
       (estype ->
-        ("_all" -> ("enabled" -> 0)) ~
         ("properties" ->
-          ("name" -> ("type" -> "keyword"))))
-    ESUtils.createMapping(restClient, index, estype, 
compact(render(mappingJson)))
-  } finally {
-    restClient.close()
+          ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+    indices.preparePutMapping(index).setType(estype).
+      setSource(compact(render(json))).get
   }
 
   def insert(channel: Channel): Option[Int] = {
     val id =
       if (channel.id == 0) {
-        var roll = seq.genNext(estype)
-        while (!get(roll).isEmpty) roll = seq.genNext(estype)
+        var roll = seq.genNext(seqName)
+        while (!get(roll).isEmpty) roll = seq.genNext(seqName)
         roll
       } else channel.id
 
-    if (update(channel.copy(id = id))) Some(id) else None
+    val realChannel = channel.copy(id = id)
+    if (update(realChannel)) Some(id) else None
   }
 
   def get(id: Int): Option[Channel] = {
-    val restClient = client.open()
     try {
-      val response = restClient.performRequest(
-        "GET",
-        s"/$index/$estype/$id",
-        Map.empty[String, String].asJava)
-      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
-      (jsonResponse \ "found").extract[Boolean] match {
-        case true =>
-          Some((jsonResponse \ "_source").extract[Channel])
-        case _ =>
-          None
-      }
+      val response = client.prepareGet(
+        index,
+        estype,
+        id.toString).get()
+      Some(read[Channel](response.getSourceAsString))
     } catch {
-      case e: ResponseException =>
-        e.getResponse.getStatusLine.getStatusCode match {
-          case 404 => None
-          case _ =>
-            error(s"Failed to access to /$index/$estype/$id", e)
-            None
-        }
-      case e: IOException =>
-        error(s"Failed to access to /$index/$estype/$id", e)
+      case e: ElasticsearchException =>
+        error(e.getMessage)
         None
-    } finally {
-      restClient.close()
+      case e: NullPointerException => None
     }
   }
 
   def getByAppid(appid: Int): Seq[Channel] = {
-    val restClient = client.open()
     try {
-      val json =
-        ("query" ->
-          ("term" ->
-            ("appid" -> appid)))
-      ESUtils.getAll[Channel](restClient, index, estype, compact(render(json)))
+      val builder = client.prepareSearch(index).setTypes(estype).
+        setPostFilter(termFilter("appid", appid))
+      ESUtils.getAll[Channel](client, builder)
     } catch {
-      case e: IOException =>
-        error(s"Failed to access to /$index/$estype/_search", e)
-        Nil
-    } finally {
-      restClient.close()
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq[Channel]()
     }
   }
 
   def update(channel: Channel): Boolean = {
-    val id = channel.id.toString
-    val restClient = client.open()
     try {
-      val entity = new NStringEntity(write(channel), 
ContentType.APPLICATION_JSON)
-      val response = restClient.performRequest(
-        "POST",
-        s"/$index/$estype/$id",
-        Map.empty[String, String].asJava,
-        entity)
-      val json = parse(EntityUtils.toString(response.getEntity))
-      val result = (json \ "result").extract[String]
-      result match {
-        case "created" => true
-        case "updated" => true
-        case _ =>
-          error(s"[$result] Failed to update $index/$estype/$id")
-          false
-      }
+      val response = client.prepareIndex(index, estype, channel.id.toString).
+        setSource(write(channel)).get()
+      true
     } catch {
-      case e: IOException =>
-        error(s"Failed to update $index/$estype/$id", e)
+      case e: ElasticsearchException =>
+        error(e.getMessage)
         false
-    } finally {
-      restClient.close()
     }
   }
 
   def delete(id: Int): Unit = {
-    val restClient = client.open()
     try {
-      val response = restClient.performRequest(
-        "DELETE",
-        s"/$index/$estype/$id",
-        Map.empty[String, String].asJava)
-      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
-      val result = (jsonResponse \ "result").extract[String]
-      result match {
-        case "deleted" =>
-        case _ =>
-          error(s"[$result] Failed to update $index/$estype/$id")
-      }
+      client.prepareDelete(index, estype, id.toString).get
     } catch {
-      case e: IOException =>
-        error(s"Failed to update $index/$estype/$id", e)
-    } finally {
-      restClient.close()
+      case e: ElasticsearchException =>
+        error(e.getMessage)
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
index 68cdeac..21690bf 100644
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala
@@ -15,234 +15,144 @@
  * limitations under the License.
  */
 
-package org.apache.predictionio.data.storage.elasticsearch
-
-import java.io.IOException
 
-import scala.collection.JavaConverters.mapAsJavaMapConverter
+package org.apache.predictionio.data.storage.elasticsearch
 
-import org.apache.http.entity.ContentType
-import org.apache.http.nio.entity.NStringEntity
-import org.apache.http.util.EntityUtils
+import grizzled.slf4j.Logging
 import org.apache.predictionio.data.storage.EngineInstance
 import org.apache.predictionio.data.storage.EngineInstanceSerializer
 import org.apache.predictionio.data.storage.EngineInstances
 import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.client.RestClient
-import org.json4s._
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
+import org.elasticsearch.search.sort.SortOrder
 import org.json4s.JsonDSL._
+import org.json4s._
 import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
 import org.json4s.native.Serialization.write
 
-import grizzled.slf4j.Logging
-import org.elasticsearch.client.ResponseException
-
-class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: 
String)
-    extends EngineInstances with Logging {
+class ESEngineInstances(client: Client, config: StorageClientConfig, index: 
String)
+  extends EngineInstances with Logging {
   implicit val formats = DefaultFormats + new EngineInstanceSerializer
   private val estype = "engine_instances"
 
-  val restClient = client.open()
-  try {
-    ESUtils.createIndex(restClient, index)
-    val mappingJson =
+  val indices = client.admin.indices
+  val indexExistResponse = indices.prepareExists(index).get
+  if (!indexExistResponse.isExists) {
+    indices.prepareCreate(index).get
+  }
+  val typeExistResponse = 
indices.prepareTypesExists(index).setTypes(estype).get
+  if (!typeExistResponse.isExists) {
+    val json =
       (estype ->
-        ("_all" -> ("enabled" -> 0)) ~
         ("properties" ->
-          ("status" -> ("type" -> "keyword")) ~
+          ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
           ("startTime" -> ("type" -> "date")) ~
           ("endTime" -> ("type" -> "date")) ~
-          ("engineId" -> ("type" -> "keyword")) ~
-          ("engineVersion" -> ("type" -> "keyword")) ~
-          ("engineVariant" -> ("type" -> "keyword")) ~
-          ("engineFactory" -> ("type" -> "keyword")) ~
-          ("batch" -> ("type" -> "keyword")) ~
-          ("dataSourceParams" -> ("type" -> "keyword")) ~
-          ("preparatorParams" -> ("type" -> "keyword")) ~
-          ("algorithmsParams" -> ("type" -> "keyword")) ~
-          ("servingParams" -> ("type" -> "keyword")) ~
-          ("status" -> ("type" -> "keyword"))))
-    ESUtils.createMapping(restClient, index, estype, 
compact(render(mappingJson)))
-  } finally {
-    restClient.close()
+          ("engineId" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("engineVersion" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("engineVariant" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("engineFactory" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("batch" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("dataSourceParams" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("preparatorParams" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("algorithmsParams" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("servingParams" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed"))))
+    indices.preparePutMapping(index).setType(estype).
+      setSource(compact(render(json))).get
   }
 
   def insert(i: EngineInstance): String = {
-    val id = i.id match {
-      case x if x.isEmpty =>
-        @scala.annotation.tailrec
-        def generateId(newId: Option[String]): String = {
-          newId match {
-            case Some(x) => x
-            case _ => generateId(preInsert())
-          }
-        }
-        generateId(preInsert())
-      case x => x
-    }
-
-    update(i.copy(id = id))
-    id
-  }
-
-  def preInsert(): Option[String] = {
-    val restClient = client.open()
     try {
-      val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON)
-      val response = restClient.performRequest(
-        "POST",
-        s"/$index/$estype/",
-        Map.empty[String, String].asJava,
-        entity)
-      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
-      val result = (jsonResponse \ "result").extract[String]
-      result match {
-        case "created" =>
-          Some((jsonResponse \ "_id").extract[String])
-        case _ =>
-          error(s"[$result] Failed to create $index/$estype")
-          None
-      }
+      val response = client.prepareIndex(index, estype).
+        setSource(write(i)).get
+      response.getId
     } catch {
-      case e: IOException =>
-        error(s"Failed to create $index/$estype", e)
-        None
-    } finally {
-      restClient.close()
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        ""
     }
   }
 
   def get(id: String): Option[EngineInstance] = {
-    val restClient = client.open()
     try {
-      val response = restClient.performRequest(
-        "GET",
-        s"/$index/$estype/$id",
-        Map.empty[String, String].asJava)
-      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
-      (jsonResponse \ "found").extract[Boolean] match {
-        case true =>
-          Some((jsonResponse \ "_source").extract[EngineInstance])
-        case _ =>
-          None
+      val response = client.prepareGet(index, estype, id).get
+      if (response.isExists) {
+        Some(read[EngineInstance](response.getSourceAsString))
+      } else {
+        None
       }
     } catch {
-      case e: ResponseException =>
-        e.getResponse.getStatusLine.getStatusCode match {
-          case 404 => None
-          case _ =>
-            error(s"Failed to access to /$index/$estype/$id", e)
-            None
-        }
-      case e: IOException =>
-        error(s"Failed to access to /$index/$estype/$id", e)
+      case e: ElasticsearchException =>
+        error(e.getMessage)
         None
-    } finally {
-      restClient.close()
     }
   }
 
   def getAll(): Seq[EngineInstance] = {
-    val restClient = client.open()
     try {
-      val json =
-        ("query" ->
-          ("match_all" -> List.empty))
-      ESUtils.getAll[EngineInstance](restClient, index, estype, 
compact(render(json)))
+      val builder = client.prepareSearch(index).setTypes(estype)
+      ESUtils.getAll[EngineInstance](client, builder)
     } catch {
-      case e: IOException =>
-        error("Failed to access to /$index/$estype/_search", e)
-        Nil
-    } finally {
-      restClient.close()
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq()
     }
   }
 
   def getCompleted(
-    engineId: String,
-    engineVersion: String,
-    engineVariant: String): Seq[EngineInstance] = {
-    val restClient = client.open()
+      engineId: String,
+      engineVersion: String,
+      engineVariant: String): Seq[EngineInstance] = {
     try {
-      val json =
-        ("query" ->
-          ("bool" ->
-            ("must" -> List(
-              ("term" ->
-                ("status" -> "COMPLETED")),
-              ("term" ->
-                ("engineId" -> engineId)),
-              ("term" ->
-                ("engineVersion" -> engineVersion)),
-              ("term" ->
-                ("engineVariant" -> engineVariant)))))) ~
-              ("sort" -> List(
-                ("startTime" ->
-                  ("order" -> "desc"))))
-      ESUtils.getAll[EngineInstance](restClient, index, estype, 
compact(render(json)))
+      val builder = client.prepareSearch(index).setTypes(estype).setPostFilter(
+        andFilter(
+          termFilter("status", "COMPLETED"),
+          termFilter("engineId", engineId),
+          termFilter("engineVersion", engineVersion),
+          termFilter("engineVariant", engineVariant))).
+        addSort("startTime", SortOrder.DESC)
+      ESUtils.getAll[EngineInstance](client, builder)
     } catch {
-      case e: IOException =>
-        error(s"Failed to access to /$index/$estype/_search", e)
-        Nil
-    } finally {
-      restClient.close()
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq()
     }
   }
 
   def getLatestCompleted(
-    engineId: String,
-    engineVersion: String,
-    engineVariant: String): Option[EngineInstance] =
+      engineId: String,
+      engineVersion: String,
+      engineVariant: String): Option[EngineInstance] =
     getCompleted(
       engineId,
       engineVersion,
       engineVariant).headOption
 
   def update(i: EngineInstance): Unit = {
-    val id = i.id
-    val restClient = client.open()
     try {
-      val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
-      val response = restClient.performRequest(
-        "POST",
-        s"/$index/$estype/$id",
-        Map.empty[String, String].asJava,
-        entity)
-      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
-      val result = (jsonResponse \ "result").extract[String]
-      result match {
-        case "created" =>
-        case "updated" =>
-        case _ =>
-          error(s"[$result] Failed to update $index/$estype/$id")
-      }
+      client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get
     } catch {
-      case e: IOException =>
-        error(s"Failed to update $index/$estype/$id", e)
-    } finally {
-      restClient.close()
+      case e: ElasticsearchException => error(e.getMessage)
     }
   }
 
   def delete(id: String): Unit = {
-    val restClient = client.open()
     try {
-      val response = restClient.performRequest(
-        "DELETE",
-        s"/$index/$estype/$id",
-        Map.empty[String, String].asJava)
-      val json = parse(EntityUtils.toString(response.getEntity))
-      val result = (json \ "result").extract[String]
-      result match {
-        case "deleted" =>
-        case _ =>
-          error(s"[$result] Failed to update $index/$estype/$id")
-      }
+      val response = client.prepareDelete(index, estype, id).get
     } catch {
-      case e: IOException =>
-        error(s"Failed to update $index/$estype/$id", e)
-    } finally {
-      restClient.close()
+      case e: ElasticsearchException => error(e.getMessage)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
index 1f798f0..85bf820 100644
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala
@@ -15,180 +15,122 @@
  * limitations under the License.
  */
 
-package org.apache.predictionio.data.storage.elasticsearch
-
-import java.io.IOException
 
-import scala.collection.JavaConverters._
+package org.apache.predictionio.data.storage.elasticsearch
 
-import org.apache.http.entity.ContentType
-import org.apache.http.nio.entity.NStringEntity
-import org.apache.http.util.EntityUtils
+import grizzled.slf4j.Logging
 import org.apache.predictionio.data.storage.EvaluationInstance
 import org.apache.predictionio.data.storage.EvaluationInstanceSerializer
 import org.apache.predictionio.data.storage.EvaluationInstances
 import org.apache.predictionio.data.storage.StorageClientConfig
-import org.apache.predictionio.data.storage.StorageClientException
-import org.elasticsearch.client.RestClient
-import org.json4s._
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
+import org.elasticsearch.index.query.FilterBuilders._
+import org.elasticsearch.search.sort.SortOrder
 import org.json4s.JsonDSL._
+import org.json4s._
 import org.json4s.native.JsonMethods._
+import org.json4s.native.Serialization.read
 import org.json4s.native.Serialization.write
 
-import grizzled.slf4j.Logging
-import org.elasticsearch.client.ResponseException
-
-class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, 
index: String)
-    extends EvaluationInstances with Logging {
+class ESEvaluationInstances(client: Client, config: StorageClientConfig, 
index: String)
+  extends EvaluationInstances with Logging {
   implicit val formats = DefaultFormats + new EvaluationInstanceSerializer
   private val estype = "evaluation_instances"
-  private val seq = new ESSequences(client, config, index)
 
-  val restClient = client.open()
-  try {
-    ESUtils.createIndex(restClient, index)
-    val mappingJson =
+  val indices = client.admin.indices
+  val indexExistResponse = indices.prepareExists(index).get
+  if (!indexExistResponse.isExists) {
+    indices.prepareCreate(index).get
+  }
+  val typeExistResponse = 
indices.prepareTypesExists(index).setTypes(estype).get
+  if (!typeExistResponse.isExists) {
+    val json =
       (estype ->
-        ("_all" -> ("enabled" -> 0)) ~
         ("properties" ->
-          ("status" -> ("type" -> "keyword")) ~
+          ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
           ("startTime" -> ("type" -> "date")) ~
           ("endTime" -> ("type" -> "date")) ~
-          ("evaluationClass" -> ("type" -> "keyword")) ~
-          ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~
-          ("batch" -> ("type" -> "keyword")) ~
-          ("evaluatorResults" -> ("type" -> "text") ~ ("index" -> "no")) ~
-          ("evaluatorResultsHTML" -> ("type" -> "text") ~ ("index" -> "no")) ~
-          ("evaluatorResultsJSON" -> ("type" -> "text") ~ ("index" -> "no"))))
-    ESUtils.createMapping(restClient, index, estype, 
compact(render(mappingJson)))
-  } finally {
-    restClient.close()
+          ("evaluationClass" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("engineParamsGeneratorClass" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("batch" ->
+            ("type" -> "string") ~ ("index" -> "not_analyzed")) ~
+          ("evaluatorResults" ->
+            ("type" -> "string") ~ ("index" -> "no")) ~
+          ("evaluatorResultsHTML" ->
+            ("type" -> "string") ~ ("index" -> "no")) ~
+          ("evaluatorResultsJSON" ->
+            ("type" -> "string") ~ ("index" -> "no"))))
+    indices.preparePutMapping(index).setType(estype).
+      setSource(compact(render(json))).get
   }
 
   def insert(i: EvaluationInstance): String = {
-    val id = i.id match {
-      case x if x.isEmpty =>
-        var roll = seq.genNext(estype).toString
-        while (!get(roll).isEmpty) roll = seq.genNext(estype).toString
-        roll
-      case x => x
+    try {
+      val response = client.prepareIndex(index, estype).
+        setSource(write(i)).get
+      response.getId
+    } catch {
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        ""
     }
-
-    update(i.copy(id = id))
-    id
   }
 
   def get(id: String): Option[EvaluationInstance] = {
-    val restClient = client.open()
     try {
-      val response = restClient.performRequest(
-        "GET",
-        s"/$index/$estype/$id",
-        Map.empty[String, String].asJava)
-      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
-      (jsonResponse \ "found").extract[Boolean] match {
-        case true =>
-          Some((jsonResponse \ "_source").extract[EvaluationInstance])
-        case _ =>
-          None
+      val response = client.prepareGet(index, estype, id).get
+      if (response.isExists) {
+        Some(read[EvaluationInstance](response.getSourceAsString))
+      } else {
+        None
       }
     } catch {
-      case e: ResponseException =>
-        e.getResponse.getStatusLine.getStatusCode match {
-          case 404 => None
-          case _ =>
-            error(s"Failed to access to /$index/$estype/$id", e)
-            None
-        }
-      case e: IOException =>
-        error(s"Failed to access to /$index/$estype/$id", e)
+      case e: ElasticsearchException =>
+        error(e.getMessage)
         None
-    } finally {
-      restClient.close()
     }
   }
 
   def getAll(): Seq[EvaluationInstance] = {
-    val restClient = client.open()
     try {
-      val json =
-        ("query" ->
-          ("match_all" -> List.empty))
-      ESUtils.getAll[EvaluationInstance](restClient, index, estype, 
compact(render(json)))
+      val builder = client.prepareSearch(index).setTypes(estype)
+      ESUtils.getAll[EvaluationInstance](client, builder)
     } catch {
-      case e: IOException =>
-        error("Failed to access to /$index/$estype/_search", e)
-        Nil
-    } finally {
-      restClient.close()
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq()
     }
   }
 
   def getCompleted(): Seq[EvaluationInstance] = {
-    val restClient = client.open()
     try {
-      val json =
-        ("query" ->
-          ("term" ->
-            ("status" -> "EVALCOMPLETED"))) ~
-            ("sort" ->
-              ("startTime" ->
-                ("order" -> "desc")))
-      ESUtils.getAll[EvaluationInstance](restClient, index, estype, 
compact(render(json)))
+      val builder = client.prepareSearch(index).setTypes(estype).setPostFilter(
+        termFilter("status", "EVALCOMPLETED")).
+        addSort("startTime", SortOrder.DESC)
+      ESUtils.getAll[EvaluationInstance](client, builder)
     } catch {
-      case e: IOException =>
-        error("Failed to access to /$index/$estype/_search", e)
-        Nil
-    } finally {
-      restClient.close()
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        Seq()
     }
   }
 
   def update(i: EvaluationInstance): Unit = {
-    val id = i.id
-    val restClient = client.open()
     try {
-      val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON)
-      val response = restClient.performRequest(
-        "POST",
-        s"/$index/$estype/$id",
-        Map.empty[String, String].asJava,
-        entity)
-      val json = parse(EntityUtils.toString(response.getEntity))
-      val result = (json \ "result").extract[String]
-      result match {
-        case "created" =>
-        case "updated" =>
-        case _ =>
-          error(s"[$result] Failed to update $index/$estype/$id")
-      }
+      client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get
     } catch {
-      case e: IOException =>
-        error(s"Failed to update $index/$estype/$id", e)
-    } finally {
-      restClient.close()
+      case e: ElasticsearchException => error(e.getMessage)
     }
   }
 
   def delete(id: String): Unit = {
-    val restClient = client.open()
     try {
-      val response = restClient.performRequest(
-        "DELETE",
-        s"/$index/$estype/$id",
-        Map.empty[String, String].asJava)
-      val json = parse(EntityUtils.toString(response.getEntity))
-      val result = (json \ "result").extract[String]
-      result match {
-        case "deleted" =>
-        case _ =>
-          error(s"[$result] Failed to update $index/$estype/$id")
-      }
+      client.prepareDelete(index, estype, id).get
     } catch {
-      case e: IOException =>
-        error(s"Failed to update $index/$estype/$id", e)
-    } finally {
-      restClient.close()
+      case e: ElasticsearchException => error(e.getMessage)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
deleted file mode 100644
index f2ab7c2..0000000
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.predictionio.data.storage.elasticsearch
-
-import org.apache.hadoop.io.DoubleWritable
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.MapWritable
-import org.apache.hadoop.io.Text
-import org.apache.predictionio.data.storage.DataMap
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.EventValidation
-import org.joda.time.DateTime
-import org.joda.time.DateTimeZone
-import org.json4s._
-
-object ESEventsUtil {
-
-  implicit val formats = DefaultFormats
-
-  def resultToEvent(id: Text, result: MapWritable, appId: Int): Event = {
-
-    def getStringCol(col: String): String = {
-      val r = result.get(new Text(col)).asInstanceOf[Text]
-      require(r != null,
-        s"Failed to get value for column ${col}. " +
-          s"StringBinary: ${r.getBytes()}.")
-
-      r.toString()
-    }
-
-    def getOptStringCol(col: String): Option[String] = {
-      val r = result.get(new Text(col))
-      if (r == null) {
-        None
-      } else {
-        Some(r.asInstanceOf[Text].toString())
-      }
-    }
-
-    val tmp = result
-      .get(new Text("properties")).asInstanceOf[MapWritable]
-      .get(new Text("fields")).asInstanceOf[MapWritable]
-      .get(new Text("rating"))
-
-    val rating =
-      if (tmp.isInstanceOf[DoubleWritable]) tmp.asInstanceOf[DoubleWritable]
-      else if (tmp.isInstanceOf[LongWritable]) {
-        new DoubleWritable(tmp.asInstanceOf[LongWritable].get().toDouble)
-      }
-      else null
-
-    val properties: DataMap =
-      if (rating != null) DataMap(s"""{"rating":${rating.get().toString}}""")
-      else DataMap()
-
-
-    val eventId = Some(getStringCol("eventId"))
-    val event = getStringCol("event")
-    val entityType = getStringCol("entityType")
-    val entityId = getStringCol("entityId")
-    val targetEntityType = getOptStringCol("targetEntityType")
-    val targetEntityId = getOptStringCol("targetEntityId")
-    val prId = getOptStringCol("prId")
-    val eventTimeZone = getOptStringCol("eventTimeZone")
-      .map(DateTimeZone.forID(_))
-      .getOrElse(EventValidation.defaultTimeZone)
-    val eventTime = new DateTime(
-      getStringCol("eventTime"), eventTimeZone)
-    val creationTimeZone = getOptStringCol("creationTimeZone")
-      .map(DateTimeZone.forID(_))
-      .getOrElse(EventValidation.defaultTimeZone)
-    val creationTime: DateTime = new DateTime(
-      getStringCol("creationTime"), creationTimeZone)
-
-
-    Event(
-      eventId = eventId,
-      event = event,
-      entityType = entityType,
-      entityId = entityId,
-      targetEntityType = targetEntityType,
-      targetEntityId = targetEntityId,
-      properties = properties,
-      eventTime = eventTime,
-      tags = Seq(),
-      prId = prId,
-      creationTime = creationTime
-    )
-  }
-
-  def eventToPut(event: Event, appId: Int): Seq[Map[String, Any]] = {
-    Seq(
-      Map(
-        "eventId" -> event.eventId,
-        "event" -> event.event,
-        "entityType" -> event.entityType,
-        "entityId" -> event.entityId,
-        "targetEntityType" -> event.targetEntityType,
-        "targetEntityId" -> event.targetEntityId,
-        "properties" -> event.properties,
-        "eventTime" -> event.eventTime,
-        "tags" -> event.tags,
-        "prId" -> event.prId,
-        "creationTime" -> event.creationTime
-      )
-    )
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
deleted file mode 100644
index b4f7dc5..0000000
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.predictionio.data.storage.elasticsearch
-
-import java.io.IOException
-
-import scala.collection.JavaConverters._
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-
-import org.apache.http.entity.ContentType
-import org.apache.http.nio.entity.NStringEntity
-import org.apache.http.util.EntityUtils
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.LEvents
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.elasticsearch.client.RestClient
-import org.joda.time.DateTime
-import org.json4s._
-import org.json4s.JsonDSL._
-import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.write
-import org.json4s.ext.JodaTimeSerializers
-
-import grizzled.slf4j.Logging
-import org.elasticsearch.client.ResponseException
-
-class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: 
String)
-    extends LEvents with Logging {
-  implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
-  private val seq = new ESSequences(client, config, index)
-  private val seqName = "events"
-
-  def getEsType(appId: Int, channelId: Option[Int] = None): String = {
-    channelId.map { ch =>
-      s"${appId}_${ch}"
-    }.getOrElse {
-      s"${appId}"
-    }
-  }
-
-  override def init(appId: Int, channelId: Option[Int] = None): Boolean = {
-    val estype = getEsType(appId, channelId)
-    val restClient = client.open()
-    try {
-      ESUtils.createIndex(restClient, index)
-      val json =
-        (estype ->
-          ("_all" -> ("enabled" -> 0)) ~
-          ("properties" ->
-            ("name" -> ("type" -> "keyword")) ~
-            ("eventId" -> ("type" -> "keyword")) ~
-            ("event" -> ("type" -> "keyword")) ~
-            ("entityType" -> ("type" -> "keyword")) ~
-            ("entityId" -> ("type" -> "keyword")) ~
-            ("targetEntityType" -> ("type" -> "keyword")) ~
-            ("targetEntityId" -> ("type" -> "keyword")) ~
-            ("properties" ->
-              ("type" -> "nested") ~
-              ("properties" ->
-                ("fields" -> ("type" -> "nested") ~
-                  ("properties" ->
-                    ("user" -> ("type" -> "long")) ~
-                    ("num" -> ("type" -> "long")))))) ~
-                    ("eventTime" -> ("type" -> "date")) ~
-                    ("tags" -> ("type" -> "keyword")) ~
-                    ("prId" -> ("type" -> "keyword")) ~
-                    ("creationTime" -> ("type" -> "date"))))
-      ESUtils.createMapping(restClient, index, estype, compact(render(json)))
-    } finally {
-      restClient.close()
-    }
-    true
-  }
-
-  override def remove(appId: Int, channelId: Option[Int] = None): Boolean = {
-    val estype = getEsType(appId, channelId)
-    val restClient = client.open()
-    try {
-      val json =
-        ("query" ->
-          ("match_all" -> List.empty))
-      val entity = new NStringEntity(compact(render(json)), 
ContentType.APPLICATION_JSON)
-      restClient.performRequest(
-        "POST",
-        s"/$index/$estype/_delete_by_query",
-        Map.empty[String, String].asJava,
-        entity).getStatusLine.getStatusCode match {
-          case 200 => true
-          case _ =>
-            error(s"Failed to remove $index/$estype")
-            false
-        }
-    } catch {
-      case e: Exception =>
-        error(s"Failed to remove $index/$estype", e)
-        false
-    } finally {
-      restClient.close()
-    }
-  }
-
-  override def close(): Unit = {
-    // nothing
-  }
-
-  override def futureInsert(
-    event: Event,
-    appId: Int,
-    channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = {
-    Future {
-      val estype = getEsType(appId, channelId)
-      val restClient = client.open()
-      try {
-        val id = event.eventId.getOrElse {
-          var roll = seq.genNext(seqName)
-          while (exists(restClient, estype, roll)) roll = seq.genNext(seqName)
-          roll.toString
-        }
-        val json = write(event.copy(eventId = Some(id)))
-        val entity = new NStringEntity(json, ContentType.APPLICATION_JSON);
-        val response = restClient.performRequest(
-          "POST",
-          s"/$index/$estype/$id",
-          Map.empty[String, String].asJava,
-          entity)
-        val jsonResponse = parse(EntityUtils.toString(response.getEntity))
-        val result = (jsonResponse \ "result").extract[String]
-        result match {
-          case "created" => id
-          case "updated" => id
-          case _ =>
-            error(s"[$result] Failed to update $index/$estype/$id")
-            ""
-        }
-      } catch {
-        case e: IOException =>
-          error(s"Failed to update $index/$estype/<id>", e)
-          ""
-      } finally {
-        restClient.close()
-      }
-    }
-  }
-
-  private def exists(restClient: RestClient, estype: String, id: Int): Boolean 
= {
-    try {
-      restClient.performRequest(
-        "GET",
-        s"/$index/$estype/$id",
-        Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
-          case 200 => true
-          case _ => false
-        }
-    } catch {
-      case e: ResponseException =>
-        e.getResponse.getStatusLine.getStatusCode match {
-          case 404 => false
-          case _ =>
-            error(s"Failed to access to /$index/$estype/$id", e)
-            false
-        }
-      case e: IOException =>
-        error(s"Failed to access to $index/$estype/$id", e)
-        false
-    }
-  }
-
-  override def futureGet(
-    eventId: String,
-    appId: Int,
-    channelId: Option[Int])(implicit ec: ExecutionContext): 
Future[Option[Event]] = {
-    Future {
-      val estype = getEsType(appId, channelId)
-      val restClient = client.open()
-      try {
-        val json =
-          ("query" ->
-            ("term" ->
-              ("eventId" -> eventId)))
-        val entity = new NStringEntity(compact(render(json)), 
ContentType.APPLICATION_JSON)
-        val response = restClient.performRequest(
-          "POST",
-          s"/$index/$estype/_search",
-          Map.empty[String, String].asJava,
-          entity)
-        val jsonResponse = parse(EntityUtils.toString(response.getEntity))
-        (jsonResponse \ "hits" \ "total").extract[Long] match {
-          case 0 => None
-          case _ =>
-            val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]]
-            val result = (results.head \ "_source").extract[Event]
-            Some(result)
-        }
-      } catch {
-        case e: IOException =>
-          error("Failed to access to /$index/$estype/_search", e)
-          None
-      } finally {
-        restClient.close()
-      }
-    }
-  }
-
-  override def futureDelete(
-    eventId: String,
-    appId: Int,
-    channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = {
-    Future {
-      val estype = getEsType(appId, channelId)
-      val restClient = client.open()
-      try {
-        val json =
-          ("query" ->
-            ("term" ->
-              ("eventId" -> eventId)))
-        val entity = new NStringEntity(compact(render(json)), 
ContentType.APPLICATION_JSON)
-        val response = restClient.performRequest(
-          "POST",
-          s"/$index/$estype/_delete_by_query",
-          Map.empty[String, String].asJava)
-        val jsonResponse = parse(EntityUtils.toString(response.getEntity))
-        val result = (jsonResponse \ "result").extract[String]
-        result match {
-          case "deleted" => true
-          case _ =>
-            error(s"[$result] Failed to update $index/$estype:$eventId")
-            false
-        }
-      } catch {
-        case e: IOException =>
-          error(s"Failed to update $index/$estype:$eventId", e)
-          false
-      } finally {
-        restClient.close()
-      }
-    }
-  }
-
-  override def futureFind(
-    appId: Int,
-    channelId: Option[Int] = None,
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None,
-    entityType: Option[String] = None,
-    entityId: Option[String] = None,
-    eventNames: Option[Seq[String]] = None,
-    targetEntityType: Option[Option[String]] = None,
-    targetEntityId: Option[Option[String]] = None,
-    limit: Option[Int] = None,
-    reversed: Option[Boolean] = None)
-    (implicit ec: ExecutionContext): Future[Iterator[Event]] = {
-    Future {
-      val estype = getEsType(appId, channelId)
-      val restClient = client.open()
-      try {
-        val query = ESUtils.createEventQuery(
-          startTime, untilTime, entityType, entityId,
-          eventNames, targetEntityType, targetEntityId, None)
-        ESUtils.getAll[Event](restClient, index, estype, query).toIterator
-      } catch {
-        case e: IOException =>
-          error(e.getMessage)
-          Iterator[Event]()
-      } finally {
-        restClient.close()
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
deleted file mode 100644
index 5784b3f..0000000
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.predictionio.data.storage.elasticsearch
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.MapWritable
-import org.apache.hadoop.io.Text
-import org.apache.predictionio.data.storage.Event
-import org.apache.predictionio.data.storage.PEvents
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import org.elasticsearch.client.RestClient
-import org.elasticsearch.hadoop.mr.EsInputFormat
-import org.elasticsearch.spark._
-import org.joda.time.DateTime
-import java.io.IOException
-import org.apache.http.util.EntityUtils
-import org.apache.http.nio.entity.NStringEntity
-import org.apache.http.entity.ContentType
-import org.json4s._
-import org.json4s.JsonDSL._
-import org.json4s.native.JsonMethods._
-import org.json4s.ext.JodaTimeSerializers
-
-
-class ESPEvents(client: ESClient, config: StorageClientConfig, index: String)
-    extends PEvents {
-  implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all
-
-  def getEsType(appId: Int, channelId: Option[Int] = None): String = {
-    channelId.map { ch =>
-      s"${appId}_${ch}"
-    }.getOrElse {
-      s"${appId}"
-    }
-  }
-
-  def getESNodes(): String = {
-    val hosts = config.properties.get("HOSTS").
-      map(_.split(",").toSeq).getOrElse(Seq("localhost"))
-    val ports = config.properties.get("PORTS").
-      map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9200))
-    val schemes = config.properties.get("SCHEMES").
-      map(_.split(",").toSeq).getOrElse(Seq("http"))
-    (hosts, ports, schemes).zipped.map(
-      (h, p, s) => s"$s://$h:$p").mkString(",")
-  }
-
-  override def find(
-    appId: Int,
-    channelId: Option[Int] = None,
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None,
-    entityType: Option[String] = None,
-    entityId: Option[String] = None,
-    eventNames: Option[Seq[String]] = None,
-    targetEntityType: Option[Option[String]] = None,
-    targetEntityId: Option[Option[String]] = None)(sc: SparkContext): 
RDD[Event] = {
-
-    val query = ESUtils.createEventQuery(
-      startTime, untilTime, entityType, entityId,
-      eventNames, targetEntityType, targetEntityId, None)
-
-    val estype = getEsType(appId, channelId)
-    val conf = new Configuration()
-    conf.set("es.resource", s"$index/$estype")
-    conf.set("es.query", query)
-    conf.set("es.nodes", getESNodes())
-
-    val rdd = sc.newAPIHadoopRDD(conf, classOf[EsInputFormat[Text, 
MapWritable]],
-      classOf[Text], classOf[MapWritable]).map {
-        case (key, doc) => {
-          ESEventsUtil.resultToEvent(key, doc, appId)
-        }
-      }
-
-    rdd
-  }
-
-  override def write(
-    events: RDD[Event],
-    appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
-    val estype = getEsType(appId, channelId)
-    events.map { event =>
-      ESEventsUtil.eventToPut(event, appId)
-    }.saveToEs(s"$index/$estype")
-  }
-
-  override def delete(
-    eventIds: RDD[String],
-    appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = {
-    val estype = getEsType(appId, channelId)
-    val restClient = client.open()
-    try {
-      eventIds.foreachPartition { iter =>
-        iter.foreach { eventId =>
-          try {
-            val json =
-              ("query" ->
-                ("term" ->
-                  ("eventId" -> eventId)))
-            val entity = new NStringEntity(compact(render(json)), 
ContentType.APPLICATION_JSON)
-            val response = restClient.performRequest(
-              "POST",
-              s"/$index/$estype/_delete_by_query",
-              Map.empty[String, String].asJava)
-            val jsonResponse = parse(EntityUtils.toString(response.getEntity))
-            val result = (jsonResponse \ "result").extract[String]
-            result match {
-              case "deleted" => true
-              case _ =>
-                logger.error(s"[$result] Failed to update 
$index/$estype:$eventId")
-                false
-            }
-          } catch {
-            case e: IOException =>
-              logger.error(s"Failed to update $index/$estype:$eventId", e)
-              false
-          }
-        }
-      }
-    } finally {
-      restClient.close()
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
index 4eb8cd7..5c9e170 100644
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala
@@ -15,65 +15,50 @@
  * limitations under the License.
  */
 
-package org.apache.predictionio.data.storage.elasticsearch
-
-import java.io.IOException
 
-import scala.collection.JavaConverters._
+package org.apache.predictionio.data.storage.elasticsearch
 
-import org.apache.http.Header
-import org.apache.http.entity.ContentType
-import org.apache.http.nio.entity.NStringEntity
-import org.apache.http.util.EntityUtils
+import grizzled.slf4j.Logging
 import org.apache.predictionio.data.storage.StorageClientConfig
-import org.apache.predictionio.data.storage.StorageClientException
-import org.elasticsearch.client.RestClient
-import org.json4s._
+import org.elasticsearch.ElasticsearchException
+import org.elasticsearch.client.Client
 import org.json4s.JsonDSL._
+import org.json4s._
 import org.json4s.native.JsonMethods._
-import org.json4s.native.Serialization.write
-
-import grizzled.slf4j.Logging
 
-class ESSequences(client: ESClient, config: StorageClientConfig, index: 
String) extends Logging {
+class ESSequences(client: Client, config: StorageClientConfig, index: String) 
extends Logging {
   implicit val formats = DefaultFormats
   private val estype = "sequences"
 
-  val restClient = client.open()
-  try {
-    ESUtils.createIndex(restClient, index)
+  val indices = client.admin.indices
+  val indexExistResponse = indices.prepareExists(index).get
+  if (!indexExistResponse.isExists) {
+    // val settingsJson =
+    //   ("number_of_shards" -> 1) ~
+    //   ("auto_expand_replicas" -> "0-all")
+    indices.prepareCreate(index).get
+  }
+  val typeExistResponse = 
indices.prepareTypesExists(index).setTypes(estype).get
+  if (!typeExistResponse.isExists) {
     val mappingJson =
       (estype ->
-        ("_all" -> ("enabled" -> 0)))
-    ESUtils.createMapping(restClient, index, estype, 
compact(render(mappingJson)))
-  } finally {
-    restClient.close()
+        ("_source" -> ("enabled" -> 0)) ~
+        ("_all" -> ("enabled" -> 0)) ~
+        ("_type" -> ("index" -> "no")) ~
+        ("enabled" -> 0))
+    indices.preparePutMapping(index).setType(estype).
+      setSource(compact(render(mappingJson))).get
   }
 
   def genNext(name: String): Int = {
-    val restClient = client.open()
     try {
-      val entity = new NStringEntity(write("n" -> name), 
ContentType.APPLICATION_JSON)
-      val response = restClient.performRequest(
-        "POST",
-        s"/$index/$estype/$name",
-        Map.empty[String, String].asJava,
-        entity)
-      val jsonResponse = parse(EntityUtils.toString(response.getEntity))
-      val result = (jsonResponse \ "result").extract[String]
-      result match {
-        case "created" =>
-          (jsonResponse \ "_version").extract[Int]
-        case "updated" =>
-          (jsonResponse \ "_version").extract[Int]
-        case _ =>
-          throw new IllegalStateException(s"[$result] Failed to update 
$index/$estype/$name")
-      }
+      val response = client.prepareIndex(index, estype, name).
+        setSource(compact(render("n" -> name))).get
+      response.getVersion().toInt
     } catch {
-      case e: IOException =>
-        throw new StorageClientException(s"Failed to update 
$index/$estype/$name", e)
-    } finally {
-      restClient.close()
+      case e: ElasticsearchException =>
+        error(e.getMessage)
+        0
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
index db841b6..f5c99bf 100644
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala
@@ -15,149 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.predictionio.data.storage.elasticsearch
 
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
+package org.apache.predictionio.data.storage.elasticsearch
 
-import org.apache.http.entity.ContentType
-import org.apache.http.entity.StringEntity
-import org.apache.http.nio.entity.NStringEntity
-import org.elasticsearch.client.RestClient
-import org.json4s._
-import org.json4s.JsonDSL._
-import org.json4s.native.JsonMethods._
+import org.elasticsearch.action.search.SearchRequestBuilder
+import org.elasticsearch.client.Client
+import org.elasticsearch.common.unit.TimeValue
+import org.json4s.Formats
 import org.json4s.native.Serialization.read
-import org.apache.http.util.EntityUtils
-import org.joda.time.DateTime
-import org.joda.time.format.DateTimeFormat
-import org.joda.time.DateTimeZone
-import org.apache.predictionio.data.storage.StorageClientConfig
-import org.apache.http.HttpHost
+
+import scala.collection.mutable.ArrayBuffer
 
 object ESUtils {
-  val scrollLife = "1m"
+  val scrollLife = new TimeValue(60000)
 
-  def getAll[T: Manifest](
-    client: RestClient,
-    index: String,
-    estype: String,
-    query: String)(
+  def getAll[T : Manifest](
+      client: Client,
+      builder: SearchRequestBuilder)(
       implicit formats: Formats): Seq[T] = {
-
-    @scala.annotation.tailrec
-    def scroll(scrollId: String, hits: Seq[JValue], results: Seq[T]): Seq[T] = 
{
-      if (hits.isEmpty) results
-      else {
-        val json = ("scroll" -> scrollLife) ~ ("scroll_id" -> scrollId)
-        val scrollBody = new StringEntity(compact(render(json)))
-        val response = client.performRequest(
-          "POST",
-          "/_search/scroll",
-          Map[String, String](),
-          scrollBody)
-        val responseJValue = parse(EntityUtils.toString(response.getEntity))
-        scroll((responseJValue \ "_scroll_id").extract[String],
-          (responseJValue \ "hits" \ "hits").extract[Seq[JValue]],
-          hits.map(h => (h \ "_source").extract[T]) ++ results)
-      }
+    val results = ArrayBuffer[T]()
+    var response = builder.setScroll(scrollLife).get
+    var hits = response.getHits().hits()
+    results ++= hits.map(h => read[T](h.getSourceAsString))
+    while (hits.size > 0) {
+      response = client.prepareSearchScroll(response.getScrollId).
+        setScroll(scrollLife).get
+      hits = response.getHits().hits()
+      results ++= hits.map(h => read[T](h.getSourceAsString))
     }
-
-    val response = client.performRequest(
-      "POST",
-      s"/$index/$estype/_search",
-      Map("scroll" -> scrollLife),
-      new StringEntity(query))
-    val responseJValue = parse(EntityUtils.toString(response.getEntity))
-    scroll((responseJValue \ "_scroll_id").extract[String],
-        (responseJValue \ "hits" \ "hits").extract[Seq[JValue]],
-        Nil)
-  }
-
-  def createIndex(
-    client: RestClient,
-    index: String): Unit = {
-    client.performRequest(
-      "HEAD",
-      s"/$index",
-      Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
-        case 404 =>
-          client.performRequest(
-            "PUT",
-            s"/$index",
-            Map.empty[String, String].asJava)
-        case 200 =>
-        case _ =>
-          throw new IllegalStateException(s"/$index is invalid.")
-      }
-  }
-
-  def createMapping(
-    client: RestClient,
-    index: String,
-    estype: String,
-    json: String): Unit = {
-    client.performRequest(
-      "HEAD",
-      s"/$index/_mapping/$estype",
-      Map.empty[String, String].asJava).getStatusLine.getStatusCode match {
-        case 404 =>
-          val entity = new NStringEntity(json, ContentType.APPLICATION_JSON)
-          client.performRequest(
-            "PUT",
-            s"/$index/_mapping/$estype",
-            Map.empty[String, String].asJava,
-            entity)
-        case 200 =>
-        case _ =>
-          throw new IllegalStateException(s"/$index/$estype is invalid: $json")
-      }
-  }
-
-  def createEventQuery(
-    startTime: Option[DateTime] = None,
-    untilTime: Option[DateTime] = None,
-    entityType: Option[String] = None,
-    entityId: Option[String] = None,
-    eventNames: Option[Seq[String]] = None,
-    targetEntityType: Option[Option[String]] = None,
-    targetEntityId: Option[Option[String]] = None,
-    reversed: Option[Boolean] = None): String = {
-    val mustQueries = Seq(
-      startTime.map(x => {
-        val v = DateTimeFormat
-          
.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC))
-        s"""{"range":{"eventTime":{"gte":"${v}"}}}"""
-      }),
-      untilTime.map(x => {
-        val v = DateTimeFormat
-          
.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC))
-        s"""{"range":{"eventTime":{"gte":"${v}"}}}"""
-      }),
-      entityType.map(x => s"""{"term":{"entityType":"${x}"}}"""),
-      entityId.map(x => s"""{"term":{"entityId":"${x}"}}"""),
-      targetEntityType.flatMap(xx => xx.map(x => 
s"""{"term":{"targetEntityType":"${x}"}}""")),
-      targetEntityId.flatMap(xx => xx.map(x => 
s"""{"term":{"targetEntityId":"${x}"}}""")),
-      eventNames
-        .map { xx => xx.map(x => "\"%s\"".format(x)) }
-        .map(x => 
s"""{"terms":{"event":[${x.mkString(",")}]}}""")).flatten.mkString(",")
-    val sortOrder = reversed.map(x => x match {
-      case true => "desc"
-      case _ => "asc"
-    })
-    s"""{
-       |"query":{"bool":{"must":[${mustQueries}]}},
-       |"sort":[{"eventTime":{"order":"${sortOrder}"}}]
-       |}""".stripMargin
-  }
-
-  def getHttpHosts(config: StorageClientConfig): Seq[HttpHost] = {
-    val hosts = config.properties.get("HOSTS").
-      map(_.split(",").toSeq).getOrElse(Seq("localhost"))
-    val ports = config.properties.get("PORTS").
-      map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9200))
-    val schemes = config.properties.get("SCHEMES").
-      map(_.split(",").toSeq).getOrElse(Seq("http"))
-    (hosts, ports, schemes).zipped.map((h, p, s) => new HttpHost(h, p, s))
+    results
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
----------------------------------------------------------------------
diff --git 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
index 647d180..75ac2b0 100644
--- 
a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
+++ 
b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
@@ -15,30 +15,36 @@
  * limitations under the License.
  */
 
+
 package org.apache.predictionio.data.storage.elasticsearch
 
-import org.apache.http.HttpHost
+import grizzled.slf4j.Logging
 import org.apache.predictionio.data.storage.BaseStorageClient
 import org.apache.predictionio.data.storage.StorageClientConfig
 import org.apache.predictionio.data.storage.StorageClientException
-import org.elasticsearch.client.RestClient
-
-import grizzled.slf4j.Logging
-
-case class ESClient(hosts: Seq[HttpHost]) {
-  def open(): RestClient = {
-    try {
-      RestClient.builder(hosts: _*).build()
-    } catch {
-      case e: Throwable =>
-        throw new StorageClientException(e.getMessage, e)
-    }
-  }
-}
+import org.elasticsearch.client.transport.TransportClient
+import org.elasticsearch.common.settings.ImmutableSettings
+import org.elasticsearch.common.transport.InetSocketTransportAddress
+import org.elasticsearch.transport.ConnectTransportException
 
 class StorageClient(val config: StorageClientConfig) extends BaseStorageClient
     with Logging {
   override val prefix = "ES"
-
-  val client = ESClient(ESUtils.getHttpHosts(config))
+  val client = try {
+    val hosts = config.properties.get("HOSTS").
+      map(_.split(",").toSeq).getOrElse(Seq("localhost"))
+    val ports = config.properties.get("PORTS").
+      map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9300))
+    val settings = ImmutableSettings.settingsBuilder()
+      .put("cluster.name", config.properties.getOrElse("CLUSTERNAME", 
"elasticsearch"))
+    val transportClient = new TransportClient(settings)
+    (hosts zip ports) foreach { hp =>
+      transportClient.addTransportAddress(
+        new InetSocketTransportAddress(hp._1, hp._2))
+    }
+    transportClient
+  } catch {
+    case e: ConnectTransportException =>
+      throw new StorageClientException(e.getMessage, e)
+  }
 }


Reply via email to