[PIO-49] Rename Elasticsearch packages Rename elasticsearch1 back to elasticsearch to main backward compatibility with existing configuration files
New ES5+ support now lives under elasticsearch5 package Includes a minor fix to the "pio status" output not showing Spark's proper location Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/c64941b6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/c64941b6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/c64941b6 Branch: refs/heads/feature/es5 Commit: c64941b6e11666ea160f4d19bd4a2302f988d9dd Parents: d039dda Author: Donald Szeto <[email protected]> Authored: Sat Feb 11 13:57:44 2017 -0800 Committer: Donald Szeto <[email protected]> Committed: Sat Feb 11 13:57:44 2017 -0800 ---------------------------------------------------------------------- .travis.yml | 4 +- build.sbt | 4 +- conf/pio-env.sh.template | 18 +- core/build.sbt | 4 +- data/build.sbt | 8 +- .../storage/elasticsearch/ESAccessKeys.scala | 154 ++++------ .../data/storage/elasticsearch/ESApps.scala | 176 ++++-------- .../data/storage/elasticsearch/ESChannels.scala | 144 ++++------ .../elasticsearch/ESEngineInstances.scala | 246 +++++----------- .../elasticsearch/ESEvaluationInstances.scala | 186 +++++------- .../storage/elasticsearch/ESEventsUtil.scala | 125 -------- .../data/storage/elasticsearch/ESLEvents.scala | 286 ------------------- .../data/storage/elasticsearch/ESPEvents.scala | 145 ---------- .../storage/elasticsearch/ESSequences.scala | 71 ++--- .../data/storage/elasticsearch/ESUtils.scala | 157 ++-------- .../storage/elasticsearch/StorageClient.scala | 40 +-- .../storage/elasticsearch1/ESAccessKeys.scala | 119 -------- .../data/storage/elasticsearch1/ESApps.scala | 130 --------- .../storage/elasticsearch1/ESChannels.scala | 117 -------- .../elasticsearch1/ESEngineInstances.scala | 158 ---------- .../elasticsearch1/ESEngineManifests.scala | 84 ------ .../elasticsearch1/ESEvaluationInstances.scala | 136 --------- .../storage/elasticsearch1/ESSequences.scala | 64 ----- .../data/storage/elasticsearch1/ESUtils.scala | 48 ---- .../storage/elasticsearch1/StorageClient.scala | 50 ---- .../data/storage/elasticsearch1/package.scala | 25 -- .../storage/elasticsearch5/ESAccessKeys.scala | 175 ++++++++++++ .../data/storage/elasticsearch5/ESApps.scala | 194 +++++++++++++ .../storage/elasticsearch5/ESChannels.scala | 165 +++++++++++ .../elasticsearch5/ESEngineInstances.scala | 248 ++++++++++++++++ .../elasticsearch5/ESEvaluationInstances.scala | 194 +++++++++++++ .../storage/elasticsearch5/ESEventsUtil.scala | 125 ++++++++ .../data/storage/elasticsearch5/ESLEvents.scala | 286 +++++++++++++++++++ .../data/storage/elasticsearch5/ESPEvents.scala | 145 ++++++++++ .../storage/elasticsearch5/ESSequences.scala | 79 +++++ .../data/storage/elasticsearch5/ESUtils.scala | 163 +++++++++++ .../storage/elasticsearch5/StorageClient.scala | 44 +++ .../data/storage/elasticsearch5/package.scala | 25 ++ project/Build.scala | 6 +- .../tools/commands/Management.scala | 2 +- 40 files changed, 2233 insertions(+), 2317 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index 4d62999..68dee42 100644 --- a/.travis.yml +++ b/.travis.yml @@ -63,8 +63,8 @@ env: matrix: - BUILD_TYPE=Unit - BUILD_TYPE=Integration METADATA_REP=PGSQL EVENTDATA_REP=PGSQL MODELDATA_REP=PGSQL - - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH1 EVENTDATA_REP=HBASE MODELDATA_REP=LOCALFS - - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH1 EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS + - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=HBASE MODELDATA_REP=LOCALFS + - BUILD_TYPE=Integration METADATA_REP=ELASTICSEARCH EVENTDATA_REP=PGSQL MODELDATA_REP=HDFS before_install: - unset SBT_OPTS JVM_OPTS http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/build.sbt ---------------------------------------------------------------------- diff --git a/build.sbt b/build.sbt index ce626ce..1e9eb8a 100644 --- a/build.sbt +++ b/build.sbt @@ -34,9 +34,9 @@ fork in (ThisBuild, run) := true javacOptions in (ThisBuild, compile) ++= Seq("-source", "1.7", "-target", "1.7", "-Xlint:deprecation", "-Xlint:unchecked") -elasticsearchVersion in ThisBuild := "5.1.2" +elasticsearch5Version in ThisBuild := "5.1.2" -elasticsearch1Version in ThisBuild := "1.7.6" +elasticsearchVersion in ThisBuild := "1.7.6" json4sVersion in ThisBuild := "3.2.10" http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/conf/pio-env.sh.template ---------------------------------------------------------------------- diff --git a/conf/pio-env.sh.template b/conf/pio-env.sh.template index 8f5d7b1..f56f137 100644 --- a/conf/pio-env.sh.template +++ b/conf/pio-env.sh.template @@ -84,17 +84,17 @@ PIO_STORAGE_SOURCES_PGSQL_PASSWORD=pio # PIO_STORAGE_SOURCES_MYSQL_PASSWORD=pio # Elasticsearch Example +# PIO_STORAGE_SOURCES_ELASTICSEARCH5_TYPE=elasticsearch5 +# PIO_STORAGE_SOURCES_ELASTICSEARCH5_HOSTS=localhost +# PIO_STORAGE_SOURCES_ELASTICSEARCH5_PORTS=9200 +# PIO_STORAGE_SOURCES_ELASTICSEARCH5_SCHEMES=http +# PIO_STORAGE_SOURCES_ELASTICSEARCH5_HOME=$PIO_HOME/vendors/elasticsearch-5.1.2 +# Elasticsearch 1.x Example # PIO_STORAGE_SOURCES_ELASTICSEARCH_TYPE=elasticsearch +# PIO_STORAGE_SOURCES_ELASTICSEARCH_CLUSTERNAME=<elasticsearch_cluster_name> # PIO_STORAGE_SOURCES_ELASTICSEARCH_HOSTS=localhost -# PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9200 -# PIO_STORAGE_SOURCES_ELASTICSEARCH_SCHEMES=http -# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-5.1.2 -# Elasticsearch 1.x Example -# PIO_STORAGE_SOURCES_ELASTICSEARCH1_TYPE=elasticsearch1 -# PIO_STORAGE_SOURCES_ELASTICSEARCH1_CLUSTERNAME=<elasticsearch_cluster_name> -# PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOSTS=localhost -# PIO_STORAGE_SOURCES_ELASTICSEARCH1_PORTS=9300 -# PIO_STORAGE_SOURCES_ELASTICSEARCH1_HOME=$PIO_HOME/vendors/elasticsearch-1.7.6 +# PIO_STORAGE_SOURCES_ELASTICSEARCH_PORTS=9300 +# PIO_STORAGE_SOURCES_ELASTICSEARCH_HOME=$PIO_HOME/vendors/elasticsearch-1.7.6 # Local File System Example # PIO_STORAGE_SOURCES_LOCALFS_TYPE=localfs http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/core/build.sbt ---------------------------------------------------------------------- diff --git a/core/build.sbt b/core/build.sbt index 305075e..b1f589d 100644 --- a/core/build.sbt +++ b/core/build.sbt @@ -32,8 +32,8 @@ libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", "org.clapper" %% "grizzled-slf4j" % "1.0.2", - "org.elasticsearch.client" % "rest" % elasticsearchVersion.value, - "org.elasticsearch" % "elasticsearch" % elasticsearch1Version.value, + "org.elasticsearch.client" % "rest" % elasticsearch5Version.value, + "org.elasticsearch" % "elasticsearch" % elasticsearchVersion.value, "org.json4s" %% "json4s-native" % json4sVersion.value, "org.json4s" %% "json4s-ext" % json4sVersion.value, "org.scalaj" %% "scalaj-http" % "1.1.6", http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/build.sbt ---------------------------------------------------------------------- diff --git a/data/build.sbt b/data/build.sbt index 75d3c09..306153a 100644 --- a/data/build.sbt +++ b/data/build.sbt @@ -43,10 +43,10 @@ libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided", "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", "org.clapper" %% "grizzled-slf4j" % "1.0.2", - "org.elasticsearch.client" % "rest" % elasticsearchVersion.value, - "org.elasticsearch" % "elasticsearch" % elasticsearch1Version.value, - "org.elasticsearch" % "elasticsearch-spark-13_2.10" % elasticsearchVersion.value % "provided", - "org.elasticsearch" % "elasticsearch-hadoop-mr" % elasticsearchVersion.value, + "org.elasticsearch.client" % "rest" % elasticsearch5Version.value, + "org.elasticsearch" % "elasticsearch" % elasticsearchVersion.value, + "org.elasticsearch" %% "elasticsearch-spark-13" % elasticsearch5Version.value % "provided", + "org.elasticsearch" % "elasticsearch-hadoop-mr" % elasticsearch5Version.value, "org.json4s" %% "json4s-native" % json4sVersion.value, "org.json4s" %% "json4s-ext" % json4sVersion.value, "org.postgresql" % "postgresql" % "9.4-1204-jdbc41", http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala index 9156fab..077168a 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala @@ -15,45 +15,44 @@ * limitations under the License. */ -package org.apache.predictionio.data.storage.elasticsearch - -import java.io.IOException -import scala.collection.JavaConverters.mapAsJavaMapConverter +package org.apache.predictionio.data.storage.elasticsearch -import org.apache.http.entity.ContentType -import org.apache.http.nio.entity.NStringEntity -import org.apache.http.util.EntityUtils +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.StorageClientConfig import org.apache.predictionio.data.storage.AccessKey import org.apache.predictionio.data.storage.AccessKeys -import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.client.RestClient -import org.json4s._ +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client +import org.elasticsearch.index.query.FilterBuilders._ import org.json4s.JsonDSL._ +import org.json4s._ import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read import org.json4s.native.Serialization.write -import grizzled.slf4j.Logging -import org.elasticsearch.client.ResponseException +import scala.util.Random /** Elasticsearch implementation of AccessKeys. */ -class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String) +class ESAccessKeys(client: Client, config: StorageClientConfig, index: String) extends AccessKeys with Logging { implicit val formats = DefaultFormats.lossless private val estype = "accesskeys" - val restClient = client.open() - try { - ESUtils.createIndex(restClient, index) - val mappingJson = + val indices = client.admin.indices + val indexExistResponse = indices.prepareExists(index).get + if (!indexExistResponse.isExists) { + indices.prepareCreate(index).get + } + val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get + if (!typeExistResponse.isExists) { + val json = (estype -> - ("_all" -> ("enabled" -> 0)) ~ ("properties" -> - ("key" -> ("type" -> "keyword")) ~ - ("events" -> ("type" -> "keyword")))) - ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) - } finally { - restClient.close() + ("key" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("events" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) + indices.preparePutMapping(index).setType(estype). + setSource(compact(render(json))).get } def insert(accessKey: AccessKey): Option[String] = { @@ -62,114 +61,59 @@ class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String) Some(key) } - def get(id: String): Option[AccessKey] = { - val restClient = client.open() + def get(key: String): Option[AccessKey] = { try { - val response = restClient.performRequest( - "GET", - s"/$index/$estype/$id", - Map.empty[String, String].asJava) - val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - (jsonResponse \ "found").extract[Boolean] match { - case true => - Some((jsonResponse \ "_source").extract[AccessKey]) - case _ => - None - } + val response = client.prepareGet( + index, + estype, + key).get() + Some(read[AccessKey](response.getSourceAsString)) } catch { - case e: ResponseException => - e.getResponse.getStatusLine.getStatusCode match { - case 404 => None - case _ => - error(s"Failed to access to /$index/$estype/$id", e) - None - } - case e: IOException => - error("Failed to access to /$index/$estype/$key", e) + case e: ElasticsearchException => + error(e.getMessage) None - } finally { - restClient.close() + case e: NullPointerException => None } } def getAll(): Seq[AccessKey] = { - val restClient = client.open() try { - val json = - ("query" -> - ("match_all" -> List.empty)) - ESUtils.getAll[AccessKey](restClient, index, estype, compact(render(json))) + val builder = client.prepareSearch(index).setTypes(estype) + ESUtils.getAll[AccessKey](client, builder) } catch { - case e: IOException => - error("Failed to access to /$index/$estype/_search", e) - Nil - } finally { - restClient.close() + case e: ElasticsearchException => + error(e.getMessage) + Seq[AccessKey]() } } def getByAppid(appid: Int): Seq[AccessKey] = { - val restClient = client.open() try { - val json = - ("query" -> - ("term" -> - ("appid" -> appid))) - ESUtils.getAll[AccessKey](restClient, index, estype, compact(render(json))) + val builder = client.prepareSearch(index).setTypes(estype). + setPostFilter(termFilter("appid", appid)) + ESUtils.getAll[AccessKey](client, builder) } catch { - case e: IOException => - error("Failed to access to /$index/$estype/_search", e) - Nil - } finally { - restClient.close() + case e: ElasticsearchException => + error(e.getMessage) + Seq[AccessKey]() } } def update(accessKey: AccessKey): Unit = { - val id = accessKey.key - val restClient = client.open() try { - val entity = new NStringEntity(write(accessKey), ContentType.APPLICATION_JSON) - val response = restClient.performRequest( - "POST", - s"/$index/$estype/$id", - Map.empty[String, String].asJava, - entity) - val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - val result = (jsonResponse \ "result").extract[String] - result match { - case "created" => - case "updated" => - case _ => - error(s"[$result] Failed to update $index/$estype/$id") - } + client.prepareIndex(index, estype, accessKey.key).setSource(write(accessKey)).get() } catch { - case e: IOException => - error(s"Failed to update $index/$estype/$id", e) - } finally { - restClient.close() + case e: ElasticsearchException => + error(e.getMessage) } } - def delete(id: String): Unit = { - val restClient = client.open() + def delete(key: String): Unit = { try { - val response = restClient.performRequest( - "DELETE", - s"/$index/$estype/$id", - Map.empty[String, String].asJava) - val json = parse(EntityUtils.toString(response.getEntity)) - val result = (json \ "result").extract[String] - result match { - case "deleted" => - case _ => - error(s"[$result] Failed to update $index/$estype/id") - } + client.prepareDelete(index, estype, key).get } catch { - case e: IOException => - error(s"Failed to update $index/$estype/id", e) - } finally { - restClient.close() + case e: ElasticsearchException => + error(e.getMessage) } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala index 0379c90..3781a4b 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala @@ -15,180 +15,116 @@ * limitations under the License. */ -package org.apache.predictionio.data.storage.elasticsearch - -import java.io.IOException -import scala.collection.JavaConverters.mapAsJavaMapConverter +package org.apache.predictionio.data.storage.elasticsearch -import org.apache.http.entity.ContentType -import org.apache.http.nio.entity.NStringEntity -import org.apache.http.util.EntityUtils +import grizzled.slf4j.Logging +import org.apache.predictionio.data.storage.StorageClientConfig import org.apache.predictionio.data.storage.App import org.apache.predictionio.data.storage.Apps -import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.client.RestClient -import org.json4s._ +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client +import org.elasticsearch.index.query.FilterBuilders._ import org.json4s.JsonDSL._ +import org.json4s._ import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read import org.json4s.native.Serialization.write -import grizzled.slf4j.Logging -import org.elasticsearch.client.ResponseException - /** Elasticsearch implementation of Items. */ -class ESApps(client: ESClient, config: StorageClientConfig, index: String) - extends Apps with Logging { +class ESApps(client: Client, config: StorageClientConfig, index: String) + extends Apps with Logging { implicit val formats = DefaultFormats.lossless private val estype = "apps" private val seq = new ESSequences(client, config, index) - val restClient = client.open() - try { - ESUtils.createIndex(restClient, index) - val mappingJson = + val indices = client.admin.indices + val indexExistResponse = indices.prepareExists(index).get + if (!indexExistResponse.isExists) { + indices.prepareCreate(index).get + } + val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get + if (!typeExistResponse.isExists) { + val json = (estype -> - ("_all" -> ("enabled" -> 0)) ~ ("properties" -> - ("id" -> ("type" -> "keyword")) ~ - ("name" -> ("type" -> "keyword")))) - ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) - } finally { - restClient.close() + ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) + indices.preparePutMapping(index).setType(estype). + setSource(compact(render(json))).get } def insert(app: App): Option[Int] = { val id = if (app.id == 0) { - var roll = seq.genNext(estype) - while (!get(roll).isEmpty) roll = seq.genNext(estype) + var roll = seq.genNext("apps") + while (!get(roll).isEmpty) roll = seq.genNext("apps") roll - } else app.id - update(app.copy(id = id)) + } + else app.id + val realapp = app.copy(id = id) + update(realapp) Some(id) } def get(id: Int): Option[App] = { - val restClient = client.open() try { - val response = restClient.performRequest( - "GET", - s"/$index/$estype/$id", - Map.empty[String, String].asJava) - val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - (jsonResponse \ "found").extract[Boolean] match { - case true => - Some((jsonResponse \ "_source").extract[App]) - case _ => - None - } + val response = client.prepareGet( + index, + estype, + id.toString).get() + Some(read[App](response.getSourceAsString)) } catch { - case e: ResponseException => - e.getResponse.getStatusLine.getStatusCode match { - case 404 => None - case _ => - error(s"Failed to access to /$index/$estype/$id", e) - None - } - case e: IOException => - error(s"Failed to access to /$index/$estype/$id", e) + case e: ElasticsearchException => + error(e.getMessage) None - } finally { - restClient.close() + case e: NullPointerException => None } } def getByName(name: String): Option[App] = { - val restClient = client.open() try { - val json = - ("query" -> - ("term" -> - ("name" -> name))) - val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) - val response = restClient.performRequest( - "POST", - s"/$index/$estype/_search", - Map.empty[String, String].asJava, - entity) - val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - (jsonResponse \ "hits" \ "total").extract[Long] match { - case 0 => None - case _ => - val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]] - val result = (results.head \ "_source").extract[App] - Some(result) + val response = client.prepareSearch(index).setTypes(estype). + setPostFilter(termFilter("name", name)).get + val hits = response.getHits().hits() + if (hits.size > 0) { + Some(read[App](hits.head.getSourceAsString)) + } else { + None } } catch { - case e: IOException => - error(s"Failed to access to /$index/$estype/_search", e) + case e: ElasticsearchException => + error(e.getMessage) None - } finally { - restClient.close() } } def getAll(): Seq[App] = { - val restClient = client.open() try { - val json = - ("query" -> - ("match_all" -> List.empty)) - ESUtils.getAll[App](restClient, index, estype, compact(render(json))) + val builder = client.prepareSearch(index).setTypes(estype) + ESUtils.getAll[App](client, builder) } catch { - case e: IOException => - error("Failed to access to /$index/$estype/_search", e) - Nil - } finally { - restClient.close() + case e: ElasticsearchException => + error(e.getMessage) + Seq[App]() } } def update(app: App): Unit = { - val id = app.id.toString - val restClient = client.open() try { - val entity = new NStringEntity(write(app), ContentType.APPLICATION_JSON); - val response = restClient.performRequest( - "POST", - s"/$index/$estype/$id", - Map.empty[String, String].asJava, - entity) - val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - val result = (jsonResponse \ "result").extract[String] - result match { - case "created" => - case "updated" => - case _ => - error(s"[$result] Failed to update $index/$estype/$id") - } + val response = client.prepareIndex(index, estype, app.id.toString). + setSource(write(app)).get() } catch { - case e: IOException => - error(s"Failed to update $index/$estype/$id", e) - } finally { - restClient.close() + case e: ElasticsearchException => + error(e.getMessage) } } def delete(id: Int): Unit = { - val restClient = client.open() try { - val response = restClient.performRequest( - "DELETE", - s"/$index/$estype/$id", - Map.empty[String, String].asJava) - val json = parse(EntityUtils.toString(response.getEntity)) - val result = (json \ "result").extract[String] - result match { - case "deleted" => - case _ => - error(s"[$result] Failed to update $index/$estype/$id") - } + client.prepareDelete(index, estype, id.toString).get } catch { - case e: IOException => - error(s"Failed to update $index/$estype/id", e) - } finally { - restClient.close() + case e: ElasticsearchException => + error(e.getMessage) } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala index b319c26..52697fd 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala @@ -15,151 +15,103 @@ * limitations under the License. */ -package org.apache.predictionio.data.storage.elasticsearch - -import java.io.IOException -import scala.collection.JavaConverters.mapAsJavaMapConverter +package org.apache.predictionio.data.storage.elasticsearch -import org.apache.http.entity.ContentType -import org.apache.http.nio.entity.NStringEntity -import org.apache.http.util.EntityUtils +import grizzled.slf4j.Logging import org.apache.predictionio.data.storage.Channel import org.apache.predictionio.data.storage.Channels import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.client.RestClient -import org.json4s._ +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client +import org.elasticsearch.index.query.FilterBuilders.termFilter +import org.json4s.DefaultFormats import org.json4s.JsonDSL._ import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read import org.json4s.native.Serialization.write -import grizzled.slf4j.Logging -import org.elasticsearch.client.ResponseException - -class ESChannels(client: ESClient, config: StorageClientConfig, index: String) +class ESChannels(client: Client, config: StorageClientConfig, index: String) extends Channels with Logging { + implicit val formats = DefaultFormats.lossless private val estype = "channels" private val seq = new ESSequences(client, config, index) + private val seqName = "channels" - val restClient = client.open() - try { - ESUtils.createIndex(restClient, index) - val mappingJson = + val indices = client.admin.indices + val indexExistResponse = indices.prepareExists(index).get + if (!indexExistResponse.isExists) { + indices.prepareCreate(index).get + } + val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get + if (!typeExistResponse.isExists) { + val json = (estype -> - ("_all" -> ("enabled" -> 0)) ~ ("properties" -> - ("name" -> ("type" -> "keyword")))) - ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) - } finally { - restClient.close() + ("name" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) + indices.preparePutMapping(index).setType(estype). + setSource(compact(render(json))).get } def insert(channel: Channel): Option[Int] = { val id = if (channel.id == 0) { - var roll = seq.genNext(estype) - while (!get(roll).isEmpty) roll = seq.genNext(estype) + var roll = seq.genNext(seqName) + while (!get(roll).isEmpty) roll = seq.genNext(seqName) roll } else channel.id - if (update(channel.copy(id = id))) Some(id) else None + val realChannel = channel.copy(id = id) + if (update(realChannel)) Some(id) else None } def get(id: Int): Option[Channel] = { - val restClient = client.open() try { - val response = restClient.performRequest( - "GET", - s"/$index/$estype/$id", - Map.empty[String, String].asJava) - val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - (jsonResponse \ "found").extract[Boolean] match { - case true => - Some((jsonResponse \ "_source").extract[Channel]) - case _ => - None - } + val response = client.prepareGet( + index, + estype, + id.toString).get() + Some(read[Channel](response.getSourceAsString)) } catch { - case e: ResponseException => - e.getResponse.getStatusLine.getStatusCode match { - case 404 => None - case _ => - error(s"Failed to access to /$index/$estype/$id", e) - None - } - case e: IOException => - error(s"Failed to access to /$index/$estype/$id", e) + case e: ElasticsearchException => + error(e.getMessage) None - } finally { - restClient.close() + case e: NullPointerException => None } } def getByAppid(appid: Int): Seq[Channel] = { - val restClient = client.open() try { - val json = - ("query" -> - ("term" -> - ("appid" -> appid))) - ESUtils.getAll[Channel](restClient, index, estype, compact(render(json))) + val builder = client.prepareSearch(index).setTypes(estype). + setPostFilter(termFilter("appid", appid)) + ESUtils.getAll[Channel](client, builder) } catch { - case e: IOException => - error(s"Failed to access to /$index/$estype/_search", e) - Nil - } finally { - restClient.close() + case e: ElasticsearchException => + error(e.getMessage) + Seq[Channel]() } } def update(channel: Channel): Boolean = { - val id = channel.id.toString - val restClient = client.open() try { - val entity = new NStringEntity(write(channel), ContentType.APPLICATION_JSON) - val response = restClient.performRequest( - "POST", - s"/$index/$estype/$id", - Map.empty[String, String].asJava, - entity) - val json = parse(EntityUtils.toString(response.getEntity)) - val result = (json \ "result").extract[String] - result match { - case "created" => true - case "updated" => true - case _ => - error(s"[$result] Failed to update $index/$estype/$id") - false - } + val response = client.prepareIndex(index, estype, channel.id.toString). + setSource(write(channel)).get() + true } catch { - case e: IOException => - error(s"Failed to update $index/$estype/$id", e) + case e: ElasticsearchException => + error(e.getMessage) false - } finally { - restClient.close() } } def delete(id: Int): Unit = { - val restClient = client.open() try { - val response = restClient.performRequest( - "DELETE", - s"/$index/$estype/$id", - Map.empty[String, String].asJava) - val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - val result = (jsonResponse \ "result").extract[String] - result match { - case "deleted" => - case _ => - error(s"[$result] Failed to update $index/$estype/$id") - } + client.prepareDelete(index, estype, id.toString).get } catch { - case e: IOException => - error(s"Failed to update $index/$estype/$id", e) - } finally { - restClient.close() + case e: ElasticsearchException => + error(e.getMessage) } } + } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala index 68cdeac..21690bf 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala @@ -15,234 +15,144 @@ * limitations under the License. */ -package org.apache.predictionio.data.storage.elasticsearch - -import java.io.IOException -import scala.collection.JavaConverters.mapAsJavaMapConverter +package org.apache.predictionio.data.storage.elasticsearch -import org.apache.http.entity.ContentType -import org.apache.http.nio.entity.NStringEntity -import org.apache.http.util.EntityUtils +import grizzled.slf4j.Logging import org.apache.predictionio.data.storage.EngineInstance import org.apache.predictionio.data.storage.EngineInstanceSerializer import org.apache.predictionio.data.storage.EngineInstances import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.client.RestClient -import org.json4s._ +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client +import org.elasticsearch.index.query.FilterBuilders._ +import org.elasticsearch.search.sort.SortOrder import org.json4s.JsonDSL._ +import org.json4s._ import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read import org.json4s.native.Serialization.write -import grizzled.slf4j.Logging -import org.elasticsearch.client.ResponseException - -class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: String) - extends EngineInstances with Logging { +class ESEngineInstances(client: Client, config: StorageClientConfig, index: String) + extends EngineInstances with Logging { implicit val formats = DefaultFormats + new EngineInstanceSerializer private val estype = "engine_instances" - val restClient = client.open() - try { - ESUtils.createIndex(restClient, index) - val mappingJson = + val indices = client.admin.indices + val indexExistResponse = indices.prepareExists(index).get + if (!indexExistResponse.isExists) { + indices.prepareCreate(index).get + } + val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get + if (!typeExistResponse.isExists) { + val json = (estype -> - ("_all" -> ("enabled" -> 0)) ~ ("properties" -> - ("status" -> ("type" -> "keyword")) ~ + ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ ("startTime" -> ("type" -> "date")) ~ ("endTime" -> ("type" -> "date")) ~ - ("engineId" -> ("type" -> "keyword")) ~ - ("engineVersion" -> ("type" -> "keyword")) ~ - ("engineVariant" -> ("type" -> "keyword")) ~ - ("engineFactory" -> ("type" -> "keyword")) ~ - ("batch" -> ("type" -> "keyword")) ~ - ("dataSourceParams" -> ("type" -> "keyword")) ~ - ("preparatorParams" -> ("type" -> "keyword")) ~ - ("algorithmsParams" -> ("type" -> "keyword")) ~ - ("servingParams" -> ("type" -> "keyword")) ~ - ("status" -> ("type" -> "keyword")))) - ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) - } finally { - restClient.close() + ("engineId" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("engineVersion" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("engineVariant" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("engineFactory" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("batch" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("dataSourceParams" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("preparatorParams" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("algorithmsParams" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("servingParams" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")))) + indices.preparePutMapping(index).setType(estype). + setSource(compact(render(json))).get } def insert(i: EngineInstance): String = { - val id = i.id match { - case x if x.isEmpty => - @scala.annotation.tailrec - def generateId(newId: Option[String]): String = { - newId match { - case Some(x) => x - case _ => generateId(preInsert()) - } - } - generateId(preInsert()) - case x => x - } - - update(i.copy(id = id)) - id - } - - def preInsert(): Option[String] = { - val restClient = client.open() try { - val entity = new NStringEntity("{}", ContentType.APPLICATION_JSON) - val response = restClient.performRequest( - "POST", - s"/$index/$estype/", - Map.empty[String, String].asJava, - entity) - val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - val result = (jsonResponse \ "result").extract[String] - result match { - case "created" => - Some((jsonResponse \ "_id").extract[String]) - case _ => - error(s"[$result] Failed to create $index/$estype") - None - } + val response = client.prepareIndex(index, estype). + setSource(write(i)).get + response.getId } catch { - case e: IOException => - error(s"Failed to create $index/$estype", e) - None - } finally { - restClient.close() + case e: ElasticsearchException => + error(e.getMessage) + "" } } def get(id: String): Option[EngineInstance] = { - val restClient = client.open() try { - val response = restClient.performRequest( - "GET", - s"/$index/$estype/$id", - Map.empty[String, String].asJava) - val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - (jsonResponse \ "found").extract[Boolean] match { - case true => - Some((jsonResponse \ "_source").extract[EngineInstance]) - case _ => - None + val response = client.prepareGet(index, estype, id).get + if (response.isExists) { + Some(read[EngineInstance](response.getSourceAsString)) + } else { + None } } catch { - case e: ResponseException => - e.getResponse.getStatusLine.getStatusCode match { - case 404 => None - case _ => - error(s"Failed to access to /$index/$estype/$id", e) - None - } - case e: IOException => - error(s"Failed to access to /$index/$estype/$id", e) + case e: ElasticsearchException => + error(e.getMessage) None - } finally { - restClient.close() } } def getAll(): Seq[EngineInstance] = { - val restClient = client.open() try { - val json = - ("query" -> - ("match_all" -> List.empty)) - ESUtils.getAll[EngineInstance](restClient, index, estype, compact(render(json))) + val builder = client.prepareSearch(index).setTypes(estype) + ESUtils.getAll[EngineInstance](client, builder) } catch { - case e: IOException => - error("Failed to access to /$index/$estype/_search", e) - Nil - } finally { - restClient.close() + case e: ElasticsearchException => + error(e.getMessage) + Seq() } } def getCompleted( - engineId: String, - engineVersion: String, - engineVariant: String): Seq[EngineInstance] = { - val restClient = client.open() + engineId: String, + engineVersion: String, + engineVariant: String): Seq[EngineInstance] = { try { - val json = - ("query" -> - ("bool" -> - ("must" -> List( - ("term" -> - ("status" -> "COMPLETED")), - ("term" -> - ("engineId" -> engineId)), - ("term" -> - ("engineVersion" -> engineVersion)), - ("term" -> - ("engineVariant" -> engineVariant)))))) ~ - ("sort" -> List( - ("startTime" -> - ("order" -> "desc")))) - ESUtils.getAll[EngineInstance](restClient, index, estype, compact(render(json))) + val builder = client.prepareSearch(index).setTypes(estype).setPostFilter( + andFilter( + termFilter("status", "COMPLETED"), + termFilter("engineId", engineId), + termFilter("engineVersion", engineVersion), + termFilter("engineVariant", engineVariant))). + addSort("startTime", SortOrder.DESC) + ESUtils.getAll[EngineInstance](client, builder) } catch { - case e: IOException => - error(s"Failed to access to /$index/$estype/_search", e) - Nil - } finally { - restClient.close() + case e: ElasticsearchException => + error(e.getMessage) + Seq() } } def getLatestCompleted( - engineId: String, - engineVersion: String, - engineVariant: String): Option[EngineInstance] = + engineId: String, + engineVersion: String, + engineVariant: String): Option[EngineInstance] = getCompleted( engineId, engineVersion, engineVariant).headOption def update(i: EngineInstance): Unit = { - val id = i.id - val restClient = client.open() try { - val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON) - val response = restClient.performRequest( - "POST", - s"/$index/$estype/$id", - Map.empty[String, String].asJava, - entity) - val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - val result = (jsonResponse \ "result").extract[String] - result match { - case "created" => - case "updated" => - case _ => - error(s"[$result] Failed to update $index/$estype/$id") - } + client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get } catch { - case e: IOException => - error(s"Failed to update $index/$estype/$id", e) - } finally { - restClient.close() + case e: ElasticsearchException => error(e.getMessage) } } def delete(id: String): Unit = { - val restClient = client.open() try { - val response = restClient.performRequest( - "DELETE", - s"/$index/$estype/$id", - Map.empty[String, String].asJava) - val json = parse(EntityUtils.toString(response.getEntity)) - val result = (json \ "result").extract[String] - result match { - case "deleted" => - case _ => - error(s"[$result] Failed to update $index/$estype/$id") - } + val response = client.prepareDelete(index, estype, id).get } catch { - case e: IOException => - error(s"Failed to update $index/$estype/$id", e) - } finally { - restClient.close() + case e: ElasticsearchException => error(e.getMessage) } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala index 1f798f0..85bf820 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala @@ -15,180 +15,122 @@ * limitations under the License. */ -package org.apache.predictionio.data.storage.elasticsearch - -import java.io.IOException -import scala.collection.JavaConverters._ +package org.apache.predictionio.data.storage.elasticsearch -import org.apache.http.entity.ContentType -import org.apache.http.nio.entity.NStringEntity -import org.apache.http.util.EntityUtils +import grizzled.slf4j.Logging import org.apache.predictionio.data.storage.EvaluationInstance import org.apache.predictionio.data.storage.EvaluationInstanceSerializer import org.apache.predictionio.data.storage.EvaluationInstances import org.apache.predictionio.data.storage.StorageClientConfig -import org.apache.predictionio.data.storage.StorageClientException -import org.elasticsearch.client.RestClient -import org.json4s._ +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client +import org.elasticsearch.index.query.FilterBuilders._ +import org.elasticsearch.search.sort.SortOrder import org.json4s.JsonDSL._ +import org.json4s._ import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read import org.json4s.native.Serialization.write -import grizzled.slf4j.Logging -import org.elasticsearch.client.ResponseException - -class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index: String) - extends EvaluationInstances with Logging { +class ESEvaluationInstances(client: Client, config: StorageClientConfig, index: String) + extends EvaluationInstances with Logging { implicit val formats = DefaultFormats + new EvaluationInstanceSerializer private val estype = "evaluation_instances" - private val seq = new ESSequences(client, config, index) - val restClient = client.open() - try { - ESUtils.createIndex(restClient, index) - val mappingJson = + val indices = client.admin.indices + val indexExistResponse = indices.prepareExists(index).get + if (!indexExistResponse.isExists) { + indices.prepareCreate(index).get + } + val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get + if (!typeExistResponse.isExists) { + val json = (estype -> - ("_all" -> ("enabled" -> 0)) ~ ("properties" -> - ("status" -> ("type" -> "keyword")) ~ + ("status" -> ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ ("startTime" -> ("type" -> "date")) ~ ("endTime" -> ("type" -> "date")) ~ - ("evaluationClass" -> ("type" -> "keyword")) ~ - ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~ - ("batch" -> ("type" -> "keyword")) ~ - ("evaluatorResults" -> ("type" -> "text") ~ ("index" -> "no")) ~ - ("evaluatorResultsHTML" -> ("type" -> "text") ~ ("index" -> "no")) ~ - ("evaluatorResultsJSON" -> ("type" -> "text") ~ ("index" -> "no")))) - ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) - } finally { - restClient.close() + ("evaluationClass" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("engineParamsGeneratorClass" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("batch" -> + ("type" -> "string") ~ ("index" -> "not_analyzed")) ~ + ("evaluatorResults" -> + ("type" -> "string") ~ ("index" -> "no")) ~ + ("evaluatorResultsHTML" -> + ("type" -> "string") ~ ("index" -> "no")) ~ + ("evaluatorResultsJSON" -> + ("type" -> "string") ~ ("index" -> "no")))) + indices.preparePutMapping(index).setType(estype). + setSource(compact(render(json))).get } def insert(i: EvaluationInstance): String = { - val id = i.id match { - case x if x.isEmpty => - var roll = seq.genNext(estype).toString - while (!get(roll).isEmpty) roll = seq.genNext(estype).toString - roll - case x => x + try { + val response = client.prepareIndex(index, estype). + setSource(write(i)).get + response.getId + } catch { + case e: ElasticsearchException => + error(e.getMessage) + "" } - - update(i.copy(id = id)) - id } def get(id: String): Option[EvaluationInstance] = { - val restClient = client.open() try { - val response = restClient.performRequest( - "GET", - s"/$index/$estype/$id", - Map.empty[String, String].asJava) - val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - (jsonResponse \ "found").extract[Boolean] match { - case true => - Some((jsonResponse \ "_source").extract[EvaluationInstance]) - case _ => - None + val response = client.prepareGet(index, estype, id).get + if (response.isExists) { + Some(read[EvaluationInstance](response.getSourceAsString)) + } else { + None } } catch { - case e: ResponseException => - e.getResponse.getStatusLine.getStatusCode match { - case 404 => None - case _ => - error(s"Failed to access to /$index/$estype/$id", e) - None - } - case e: IOException => - error(s"Failed to access to /$index/$estype/$id", e) + case e: ElasticsearchException => + error(e.getMessage) None - } finally { - restClient.close() } } def getAll(): Seq[EvaluationInstance] = { - val restClient = client.open() try { - val json = - ("query" -> - ("match_all" -> List.empty)) - ESUtils.getAll[EvaluationInstance](restClient, index, estype, compact(render(json))) + val builder = client.prepareSearch(index).setTypes(estype) + ESUtils.getAll[EvaluationInstance](client, builder) } catch { - case e: IOException => - error("Failed to access to /$index/$estype/_search", e) - Nil - } finally { - restClient.close() + case e: ElasticsearchException => + error(e.getMessage) + Seq() } } def getCompleted(): Seq[EvaluationInstance] = { - val restClient = client.open() try { - val json = - ("query" -> - ("term" -> - ("status" -> "EVALCOMPLETED"))) ~ - ("sort" -> - ("startTime" -> - ("order" -> "desc"))) - ESUtils.getAll[EvaluationInstance](restClient, index, estype, compact(render(json))) + val builder = client.prepareSearch(index).setTypes(estype).setPostFilter( + termFilter("status", "EVALCOMPLETED")). + addSort("startTime", SortOrder.DESC) + ESUtils.getAll[EvaluationInstance](client, builder) } catch { - case e: IOException => - error("Failed to access to /$index/$estype/_search", e) - Nil - } finally { - restClient.close() + case e: ElasticsearchException => + error(e.getMessage) + Seq() } } def update(i: EvaluationInstance): Unit = { - val id = i.id - val restClient = client.open() try { - val entity = new NStringEntity(write(i), ContentType.APPLICATION_JSON) - val response = restClient.performRequest( - "POST", - s"/$index/$estype/$id", - Map.empty[String, String].asJava, - entity) - val json = parse(EntityUtils.toString(response.getEntity)) - val result = (json \ "result").extract[String] - result match { - case "created" => - case "updated" => - case _ => - error(s"[$result] Failed to update $index/$estype/$id") - } + client.prepareUpdate(index, estype, i.id).setDoc(write(i)).get } catch { - case e: IOException => - error(s"Failed to update $index/$estype/$id", e) - } finally { - restClient.close() + case e: ElasticsearchException => error(e.getMessage) } } def delete(id: String): Unit = { - val restClient = client.open() try { - val response = restClient.performRequest( - "DELETE", - s"/$index/$estype/$id", - Map.empty[String, String].asJava) - val json = parse(EntityUtils.toString(response.getEntity)) - val result = (json \ "result").extract[String] - result match { - case "deleted" => - case _ => - error(s"[$result] Failed to update $index/$estype/$id") - } + client.prepareDelete(index, estype, id).get } catch { - case e: IOException => - error(s"Failed to update $index/$estype/$id", e) - } finally { - restClient.close() + case e: ElasticsearchException => error(e.getMessage) } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala deleted file mode 100644 index f2ab7c2..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.predictionio.data.storage.elasticsearch - -import org.apache.hadoop.io.DoubleWritable -import org.apache.hadoop.io.LongWritable -import org.apache.hadoop.io.MapWritable -import org.apache.hadoop.io.Text -import org.apache.predictionio.data.storage.DataMap -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.EventValidation -import org.joda.time.DateTime -import org.joda.time.DateTimeZone -import org.json4s._ - -object ESEventsUtil { - - implicit val formats = DefaultFormats - - def resultToEvent(id: Text, result: MapWritable, appId: Int): Event = { - - def getStringCol(col: String): String = { - val r = result.get(new Text(col)).asInstanceOf[Text] - require(r != null, - s"Failed to get value for column ${col}. " + - s"StringBinary: ${r.getBytes()}.") - - r.toString() - } - - def getOptStringCol(col: String): Option[String] = { - val r = result.get(new Text(col)) - if (r == null) { - None - } else { - Some(r.asInstanceOf[Text].toString()) - } - } - - val tmp = result - .get(new Text("properties")).asInstanceOf[MapWritable] - .get(new Text("fields")).asInstanceOf[MapWritable] - .get(new Text("rating")) - - val rating = - if (tmp.isInstanceOf[DoubleWritable]) tmp.asInstanceOf[DoubleWritable] - else if (tmp.isInstanceOf[LongWritable]) { - new DoubleWritable(tmp.asInstanceOf[LongWritable].get().toDouble) - } - else null - - val properties: DataMap = - if (rating != null) DataMap(s"""{"rating":${rating.get().toString}}""") - else DataMap() - - - val eventId = Some(getStringCol("eventId")) - val event = getStringCol("event") - val entityType = getStringCol("entityType") - val entityId = getStringCol("entityId") - val targetEntityType = getOptStringCol("targetEntityType") - val targetEntityId = getOptStringCol("targetEntityId") - val prId = getOptStringCol("prId") - val eventTimeZone = getOptStringCol("eventTimeZone") - .map(DateTimeZone.forID(_)) - .getOrElse(EventValidation.defaultTimeZone) - val eventTime = new DateTime( - getStringCol("eventTime"), eventTimeZone) - val creationTimeZone = getOptStringCol("creationTimeZone") - .map(DateTimeZone.forID(_)) - .getOrElse(EventValidation.defaultTimeZone) - val creationTime: DateTime = new DateTime( - getStringCol("creationTime"), creationTimeZone) - - - Event( - eventId = eventId, - event = event, - entityType = entityType, - entityId = entityId, - targetEntityType = targetEntityType, - targetEntityId = targetEntityId, - properties = properties, - eventTime = eventTime, - tags = Seq(), - prId = prId, - creationTime = creationTime - ) - } - - def eventToPut(event: Event, appId: Int): Seq[Map[String, Any]] = { - Seq( - Map( - "eventId" -> event.eventId, - "event" -> event.event, - "entityType" -> event.entityType, - "entityId" -> event.entityId, - "targetEntityType" -> event.targetEntityType, - "targetEntityId" -> event.targetEntityId, - "properties" -> event.properties, - "eventTime" -> event.eventTime, - "tags" -> event.tags, - "prId" -> event.prId, - "creationTime" -> event.creationTime - ) - ) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala deleted file mode 100644 index b4f7dc5..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.predictionio.data.storage.elasticsearch - -import java.io.IOException - -import scala.collection.JavaConverters._ -import scala.concurrent.ExecutionContext -import scala.concurrent.Future - -import org.apache.http.entity.ContentType -import org.apache.http.nio.entity.NStringEntity -import org.apache.http.util.EntityUtils -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.LEvents -import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.client.RestClient -import org.joda.time.DateTime -import org.json4s._ -import org.json4s.JsonDSL._ -import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization.write -import org.json4s.ext.JodaTimeSerializers - -import grizzled.slf4j.Logging -import org.elasticsearch.client.ResponseException - -class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: String) - extends LEvents with Logging { - implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all - private val seq = new ESSequences(client, config, index) - private val seqName = "events" - - def getEsType(appId: Int, channelId: Option[Int] = None): String = { - channelId.map { ch => - s"${appId}_${ch}" - }.getOrElse { - s"${appId}" - } - } - - override def init(appId: Int, channelId: Option[Int] = None): Boolean = { - val estype = getEsType(appId, channelId) - val restClient = client.open() - try { - ESUtils.createIndex(restClient, index) - val json = - (estype -> - ("_all" -> ("enabled" -> 0)) ~ - ("properties" -> - ("name" -> ("type" -> "keyword")) ~ - ("eventId" -> ("type" -> "keyword")) ~ - ("event" -> ("type" -> "keyword")) ~ - ("entityType" -> ("type" -> "keyword")) ~ - ("entityId" -> ("type" -> "keyword")) ~ - ("targetEntityType" -> ("type" -> "keyword")) ~ - ("targetEntityId" -> ("type" -> "keyword")) ~ - ("properties" -> - ("type" -> "nested") ~ - ("properties" -> - ("fields" -> ("type" -> "nested") ~ - ("properties" -> - ("user" -> ("type" -> "long")) ~ - ("num" -> ("type" -> "long")))))) ~ - ("eventTime" -> ("type" -> "date")) ~ - ("tags" -> ("type" -> "keyword")) ~ - ("prId" -> ("type" -> "keyword")) ~ - ("creationTime" -> ("type" -> "date")))) - ESUtils.createMapping(restClient, index, estype, compact(render(json))) - } finally { - restClient.close() - } - true - } - - override def remove(appId: Int, channelId: Option[Int] = None): Boolean = { - val estype = getEsType(appId, channelId) - val restClient = client.open() - try { - val json = - ("query" -> - ("match_all" -> List.empty)) - val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) - restClient.performRequest( - "POST", - s"/$index/$estype/_delete_by_query", - Map.empty[String, String].asJava, - entity).getStatusLine.getStatusCode match { - case 200 => true - case _ => - error(s"Failed to remove $index/$estype") - false - } - } catch { - case e: Exception => - error(s"Failed to remove $index/$estype", e) - false - } finally { - restClient.close() - } - } - - override def close(): Unit = { - // nothing - } - - override def futureInsert( - event: Event, - appId: Int, - channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = { - Future { - val estype = getEsType(appId, channelId) - val restClient = client.open() - try { - val id = event.eventId.getOrElse { - var roll = seq.genNext(seqName) - while (exists(restClient, estype, roll)) roll = seq.genNext(seqName) - roll.toString - } - val json = write(event.copy(eventId = Some(id))) - val entity = new NStringEntity(json, ContentType.APPLICATION_JSON); - val response = restClient.performRequest( - "POST", - s"/$index/$estype/$id", - Map.empty[String, String].asJava, - entity) - val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - val result = (jsonResponse \ "result").extract[String] - result match { - case "created" => id - case "updated" => id - case _ => - error(s"[$result] Failed to update $index/$estype/$id") - "" - } - } catch { - case e: IOException => - error(s"Failed to update $index/$estype/<id>", e) - "" - } finally { - restClient.close() - } - } - } - - private def exists(restClient: RestClient, estype: String, id: Int): Boolean = { - try { - restClient.performRequest( - "GET", - s"/$index/$estype/$id", - Map.empty[String, String].asJava).getStatusLine.getStatusCode match { - case 200 => true - case _ => false - } - } catch { - case e: ResponseException => - e.getResponse.getStatusLine.getStatusCode match { - case 404 => false - case _ => - error(s"Failed to access to /$index/$estype/$id", e) - false - } - case e: IOException => - error(s"Failed to access to $index/$estype/$id", e) - false - } - } - - override def futureGet( - eventId: String, - appId: Int, - channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = { - Future { - val estype = getEsType(appId, channelId) - val restClient = client.open() - try { - val json = - ("query" -> - ("term" -> - ("eventId" -> eventId))) - val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) - val response = restClient.performRequest( - "POST", - s"/$index/$estype/_search", - Map.empty[String, String].asJava, - entity) - val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - (jsonResponse \ "hits" \ "total").extract[Long] match { - case 0 => None - case _ => - val results = (jsonResponse \ "hits" \ "hits").extract[Seq[JValue]] - val result = (results.head \ "_source").extract[Event] - Some(result) - } - } catch { - case e: IOException => - error("Failed to access to /$index/$estype/_search", e) - None - } finally { - restClient.close() - } - } - } - - override def futureDelete( - eventId: String, - appId: Int, - channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = { - Future { - val estype = getEsType(appId, channelId) - val restClient = client.open() - try { - val json = - ("query" -> - ("term" -> - ("eventId" -> eventId))) - val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) - val response = restClient.performRequest( - "POST", - s"/$index/$estype/_delete_by_query", - Map.empty[String, String].asJava) - val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - val result = (jsonResponse \ "result").extract[String] - result match { - case "deleted" => true - case _ => - error(s"[$result] Failed to update $index/$estype:$eventId") - false - } - } catch { - case e: IOException => - error(s"Failed to update $index/$estype:$eventId", e) - false - } finally { - restClient.close() - } - } - } - - override def futureFind( - appId: Int, - channelId: Option[Int] = None, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - entityType: Option[String] = None, - entityId: Option[String] = None, - eventNames: Option[Seq[String]] = None, - targetEntityType: Option[Option[String]] = None, - targetEntityId: Option[Option[String]] = None, - limit: Option[Int] = None, - reversed: Option[Boolean] = None) - (implicit ec: ExecutionContext): Future[Iterator[Event]] = { - Future { - val estype = getEsType(appId, channelId) - val restClient = client.open() - try { - val query = ESUtils.createEventQuery( - startTime, untilTime, entityType, entityId, - eventNames, targetEntityType, targetEntityId, None) - ESUtils.getAll[Event](restClient, index, estype, query).toIterator - } catch { - case e: IOException => - error(e.getMessage) - Iterator[Event]() - } finally { - restClient.close() - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala deleted file mode 100644 index 5784b3f..0000000 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.predictionio.data.storage.elasticsearch - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.io.MapWritable -import org.apache.hadoop.io.Text -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.PEvents -import org.apache.predictionio.data.storage.StorageClientConfig -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD -import org.elasticsearch.client.RestClient -import org.elasticsearch.hadoop.mr.EsInputFormat -import org.elasticsearch.spark._ -import org.joda.time.DateTime -import java.io.IOException -import org.apache.http.util.EntityUtils -import org.apache.http.nio.entity.NStringEntity -import org.apache.http.entity.ContentType -import org.json4s._ -import org.json4s.JsonDSL._ -import org.json4s.native.JsonMethods._ -import org.json4s.ext.JodaTimeSerializers - - -class ESPEvents(client: ESClient, config: StorageClientConfig, index: String) - extends PEvents { - implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all - - def getEsType(appId: Int, channelId: Option[Int] = None): String = { - channelId.map { ch => - s"${appId}_${ch}" - }.getOrElse { - s"${appId}" - } - } - - def getESNodes(): String = { - val hosts = config.properties.get("HOSTS"). - map(_.split(",").toSeq).getOrElse(Seq("localhost")) - val ports = config.properties.get("PORTS"). - map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9200)) - val schemes = config.properties.get("SCHEMES"). - map(_.split(",").toSeq).getOrElse(Seq("http")) - (hosts, ports, schemes).zipped.map( - (h, p, s) => s"$s://$h:$p").mkString(",") - } - - override def find( - appId: Int, - channelId: Option[Int] = None, - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - entityType: Option[String] = None, - entityId: Option[String] = None, - eventNames: Option[Seq[String]] = None, - targetEntityType: Option[Option[String]] = None, - targetEntityId: Option[Option[String]] = None)(sc: SparkContext): RDD[Event] = { - - val query = ESUtils.createEventQuery( - startTime, untilTime, entityType, entityId, - eventNames, targetEntityType, targetEntityId, None) - - val estype = getEsType(appId, channelId) - val conf = new Configuration() - conf.set("es.resource", s"$index/$estype") - conf.set("es.query", query) - conf.set("es.nodes", getESNodes()) - - val rdd = sc.newAPIHadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]], - classOf[Text], classOf[MapWritable]).map { - case (key, doc) => { - ESEventsUtil.resultToEvent(key, doc, appId) - } - } - - rdd - } - - override def write( - events: RDD[Event], - appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { - val estype = getEsType(appId, channelId) - events.map { event => - ESEventsUtil.eventToPut(event, appId) - }.saveToEs(s"$index/$estype") - } - - override def delete( - eventIds: RDD[String], - appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { - val estype = getEsType(appId, channelId) - val restClient = client.open() - try { - eventIds.foreachPartition { iter => - iter.foreach { eventId => - try { - val json = - ("query" -> - ("term" -> - ("eventId" -> eventId))) - val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) - val response = restClient.performRequest( - "POST", - s"/$index/$estype/_delete_by_query", - Map.empty[String, String].asJava) - val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - val result = (jsonResponse \ "result").extract[String] - result match { - case "deleted" => true - case _ => - logger.error(s"[$result] Failed to update $index/$estype:$eventId") - false - } - } catch { - case e: IOException => - logger.error(s"Failed to update $index/$estype:$eventId", e) - false - } - } - } - } finally { - restClient.close() - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala index 4eb8cd7..5c9e170 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala @@ -15,65 +15,50 @@ * limitations under the License. */ -package org.apache.predictionio.data.storage.elasticsearch - -import java.io.IOException -import scala.collection.JavaConverters._ +package org.apache.predictionio.data.storage.elasticsearch -import org.apache.http.Header -import org.apache.http.entity.ContentType -import org.apache.http.nio.entity.NStringEntity -import org.apache.http.util.EntityUtils +import grizzled.slf4j.Logging import org.apache.predictionio.data.storage.StorageClientConfig -import org.apache.predictionio.data.storage.StorageClientException -import org.elasticsearch.client.RestClient -import org.json4s._ +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.client.Client import org.json4s.JsonDSL._ +import org.json4s._ import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization.write - -import grizzled.slf4j.Logging -class ESSequences(client: ESClient, config: StorageClientConfig, index: String) extends Logging { +class ESSequences(client: Client, config: StorageClientConfig, index: String) extends Logging { implicit val formats = DefaultFormats private val estype = "sequences" - val restClient = client.open() - try { - ESUtils.createIndex(restClient, index) + val indices = client.admin.indices + val indexExistResponse = indices.prepareExists(index).get + if (!indexExistResponse.isExists) { + // val settingsJson = + // ("number_of_shards" -> 1) ~ + // ("auto_expand_replicas" -> "0-all") + indices.prepareCreate(index).get + } + val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get + if (!typeExistResponse.isExists) { val mappingJson = (estype -> - ("_all" -> ("enabled" -> 0))) - ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) - } finally { - restClient.close() + ("_source" -> ("enabled" -> 0)) ~ + ("_all" -> ("enabled" -> 0)) ~ + ("_type" -> ("index" -> "no")) ~ + ("enabled" -> 0)) + indices.preparePutMapping(index).setType(estype). + setSource(compact(render(mappingJson))).get } def genNext(name: String): Int = { - val restClient = client.open() try { - val entity = new NStringEntity(write("n" -> name), ContentType.APPLICATION_JSON) - val response = restClient.performRequest( - "POST", - s"/$index/$estype/$name", - Map.empty[String, String].asJava, - entity) - val jsonResponse = parse(EntityUtils.toString(response.getEntity)) - val result = (jsonResponse \ "result").extract[String] - result match { - case "created" => - (jsonResponse \ "_version").extract[Int] - case "updated" => - (jsonResponse \ "_version").extract[Int] - case _ => - throw new IllegalStateException(s"[$result] Failed to update $index/$estype/$name") - } + val response = client.prepareIndex(index, estype, name). + setSource(compact(render("n" -> name))).get + response.getVersion().toInt } catch { - case e: IOException => - throw new StorageClientException(s"Failed to update $index/$estype/$name", e) - } finally { - restClient.close() + case e: ElasticsearchException => + error(e.getMessage) + 0 } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala index db841b6..f5c99bf 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala @@ -15,149 +15,34 @@ * limitations under the License. */ -package org.apache.predictionio.data.storage.elasticsearch -import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ +package org.apache.predictionio.data.storage.elasticsearch -import org.apache.http.entity.ContentType -import org.apache.http.entity.StringEntity -import org.apache.http.nio.entity.NStringEntity -import org.elasticsearch.client.RestClient -import org.json4s._ -import org.json4s.JsonDSL._ -import org.json4s.native.JsonMethods._ +import org.elasticsearch.action.search.SearchRequestBuilder +import org.elasticsearch.client.Client +import org.elasticsearch.common.unit.TimeValue +import org.json4s.Formats import org.json4s.native.Serialization.read -import org.apache.http.util.EntityUtils -import org.joda.time.DateTime -import org.joda.time.format.DateTimeFormat -import org.joda.time.DateTimeZone -import org.apache.predictionio.data.storage.StorageClientConfig -import org.apache.http.HttpHost + +import scala.collection.mutable.ArrayBuffer object ESUtils { - val scrollLife = "1m" + val scrollLife = new TimeValue(60000) - def getAll[T: Manifest]( - client: RestClient, - index: String, - estype: String, - query: String)( + def getAll[T : Manifest]( + client: Client, + builder: SearchRequestBuilder)( implicit formats: Formats): Seq[T] = { - - @scala.annotation.tailrec - def scroll(scrollId: String, hits: Seq[JValue], results: Seq[T]): Seq[T] = { - if (hits.isEmpty) results - else { - val json = ("scroll" -> scrollLife) ~ ("scroll_id" -> scrollId) - val scrollBody = new StringEntity(compact(render(json))) - val response = client.performRequest( - "POST", - "/_search/scroll", - Map[String, String](), - scrollBody) - val responseJValue = parse(EntityUtils.toString(response.getEntity)) - scroll((responseJValue \ "_scroll_id").extract[String], - (responseJValue \ "hits" \ "hits").extract[Seq[JValue]], - hits.map(h => (h \ "_source").extract[T]) ++ results) - } + val results = ArrayBuffer[T]() + var response = builder.setScroll(scrollLife).get + var hits = response.getHits().hits() + results ++= hits.map(h => read[T](h.getSourceAsString)) + while (hits.size > 0) { + response = client.prepareSearchScroll(response.getScrollId). + setScroll(scrollLife).get + hits = response.getHits().hits() + results ++= hits.map(h => read[T](h.getSourceAsString)) } - - val response = client.performRequest( - "POST", - s"/$index/$estype/_search", - Map("scroll" -> scrollLife), - new StringEntity(query)) - val responseJValue = parse(EntityUtils.toString(response.getEntity)) - scroll((responseJValue \ "_scroll_id").extract[String], - (responseJValue \ "hits" \ "hits").extract[Seq[JValue]], - Nil) - } - - def createIndex( - client: RestClient, - index: String): Unit = { - client.performRequest( - "HEAD", - s"/$index", - Map.empty[String, String].asJava).getStatusLine.getStatusCode match { - case 404 => - client.performRequest( - "PUT", - s"/$index", - Map.empty[String, String].asJava) - case 200 => - case _ => - throw new IllegalStateException(s"/$index is invalid.") - } - } - - def createMapping( - client: RestClient, - index: String, - estype: String, - json: String): Unit = { - client.performRequest( - "HEAD", - s"/$index/_mapping/$estype", - Map.empty[String, String].asJava).getStatusLine.getStatusCode match { - case 404 => - val entity = new NStringEntity(json, ContentType.APPLICATION_JSON) - client.performRequest( - "PUT", - s"/$index/_mapping/$estype", - Map.empty[String, String].asJava, - entity) - case 200 => - case _ => - throw new IllegalStateException(s"/$index/$estype is invalid: $json") - } - } - - def createEventQuery( - startTime: Option[DateTime] = None, - untilTime: Option[DateTime] = None, - entityType: Option[String] = None, - entityId: Option[String] = None, - eventNames: Option[Seq[String]] = None, - targetEntityType: Option[Option[String]] = None, - targetEntityId: Option[Option[String]] = None, - reversed: Option[Boolean] = None): String = { - val mustQueries = Seq( - startTime.map(x => { - val v = DateTimeFormat - .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC)) - s"""{"range":{"eventTime":{"gte":"${v}"}}}""" - }), - untilTime.map(x => { - val v = DateTimeFormat - .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC)) - s"""{"range":{"eventTime":{"gte":"${v}"}}}""" - }), - entityType.map(x => s"""{"term":{"entityType":"${x}"}}"""), - entityId.map(x => s"""{"term":{"entityId":"${x}"}}"""), - targetEntityType.flatMap(xx => xx.map(x => s"""{"term":{"targetEntityType":"${x}"}}""")), - targetEntityId.flatMap(xx => xx.map(x => s"""{"term":{"targetEntityId":"${x}"}}""")), - eventNames - .map { xx => xx.map(x => "\"%s\"".format(x)) } - .map(x => s"""{"terms":{"event":[${x.mkString(",")}]}}""")).flatten.mkString(",") - val sortOrder = reversed.map(x => x match { - case true => "desc" - case _ => "asc" - }) - s"""{ - |"query":{"bool":{"must":[${mustQueries}]}}, - |"sort":[{"eventTime":{"order":"${sortOrder}"}}] - |}""".stripMargin - } - - def getHttpHosts(config: StorageClientConfig): Seq[HttpHost] = { - val hosts = config.properties.get("HOSTS"). - map(_.split(",").toSeq).getOrElse(Seq("localhost")) - val ports = config.properties.get("PORTS"). - map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9200)) - val schemes = config.properties.get("SCHEMES"). - map(_.split(",").toSeq).getOrElse(Seq("http")) - (hosts, ports, schemes).zipped.map((h, p, s) => new HttpHost(h, p, s)) + results } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/c64941b6/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala index 647d180..75ac2b0 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala @@ -15,30 +15,36 @@ * limitations under the License. */ + package org.apache.predictionio.data.storage.elasticsearch -import org.apache.http.HttpHost +import grizzled.slf4j.Logging import org.apache.predictionio.data.storage.BaseStorageClient import org.apache.predictionio.data.storage.StorageClientConfig import org.apache.predictionio.data.storage.StorageClientException -import org.elasticsearch.client.RestClient - -import grizzled.slf4j.Logging - -case class ESClient(hosts: Seq[HttpHost]) { - def open(): RestClient = { - try { - RestClient.builder(hosts: _*).build() - } catch { - case e: Throwable => - throw new StorageClientException(e.getMessage, e) - } - } -} +import org.elasticsearch.client.transport.TransportClient +import org.elasticsearch.common.settings.ImmutableSettings +import org.elasticsearch.common.transport.InetSocketTransportAddress +import org.elasticsearch.transport.ConnectTransportException class StorageClient(val config: StorageClientConfig) extends BaseStorageClient with Logging { override val prefix = "ES" - - val client = ESClient(ESUtils.getHttpHosts(config)) + val client = try { + val hosts = config.properties.get("HOSTS"). + map(_.split(",").toSeq).getOrElse(Seq("localhost")) + val ports = config.properties.get("PORTS"). + map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9300)) + val settings = ImmutableSettings.settingsBuilder() + .put("cluster.name", config.properties.getOrElse("CLUSTERNAME", "elasticsearch")) + val transportClient = new TransportClient(settings) + (hosts zip ports) foreach { hp => + transportClient.addTransportAddress( + new InetSocketTransportAddress(hp._1, hp._2)) + } + transportClient + } catch { + case e: ConnectTransportException => + throw new StorageClientException(e.getMessage, e) + } }
