Repository: incubator-predictionio Updated Branches: refs/heads/feature/es5 [created] c64941b6e
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala new file mode 100644 index 0000000..0e3eec8 --- /dev/null +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.data.storage.elasticsearch + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.MapWritable +import org.apache.hadoop.io.Text +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.PEvents +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.elasticsearch.client.RestClient +import org.elasticsearch.hadoop.mr.EsInputFormat +import org.elasticsearch.spark._ +import org.joda.time.DateTime +import java.io.IOException +import org.apache.http.util.EntityUtils +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.entity.ContentType +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.ext.JodaTimeSerializers + + +class ESPEvents(client: RestClient, config: StorageClientConfig, index: String) + extends PEvents { + implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all + + // client is not used. + try client.close() catch { + case e: Exception => + logger.error("Failed to close client.", e) + } + + def getEsType(appId: Int, channelId: Option[Int] = None): String = { + channelId.map { ch => + s"${appId}_${ch}" + }.getOrElse { + s"${appId}" + } + } + + def getESNodes(): String = { + val hosts = config.properties.get("HOSTS"). + map(_.split(",").toSeq).getOrElse(Seq("localhost")) + val ports = config.properties.get("PORTS"). + map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9200)) + val schemes = config.properties.get("SCHEMES"). + map(_.split(",").toSeq).getOrElse(Seq("http")) + (hosts, ports, schemes).zipped.map( + (h, p, s) => s"$s://$h:$p").mkString(",") + } + + override def find( + appId: Int, + channelId: Option[Int] = None, + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None)(sc: SparkContext): RDD[Event] = { + + val query = ESUtils.createEventQuery( + startTime, untilTime, entityType, entityId, + eventNames, targetEntityType, targetEntityId, None) + + val estype = getEsType(appId, channelId) + val conf = new Configuration() + conf.set("es.resource", s"$index/$estype") + conf.set("es.query", query) + conf.set("es.nodes", getESNodes()) + + val rdd = sc.newAPIHadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]], + classOf[Text], classOf[MapWritable]).map { + case (key, doc) => { + ESEventsUtil.resultToEvent(key, doc, appId) + } + } + + rdd + } + + override def write( + events: RDD[Event], + appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { + val estype = getEsType(appId, channelId) + events.map { event => + ESEventsUtil.eventToPut(event, appId) + }.saveToEs(s"$index/$estype") + } + + override def delete( + eventIds: RDD[String], + appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { + val estype = getEsType(appId, channelId) + val restClient = ESUtils.createRestClient(config) + try { + eventIds.foreachPartition { iter => + iter.foreach { eventId => + try { + val json = + ("query" -> + ("term" -> + ("eventId" -> eventId))) + val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) + val response = client.performRequest( + "POST", + s"/$index/$estype/_delete_by_query", + Map.empty[String, String].asJava) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "deleted" => true + case _ => + logger.error(s"[$result] Failed to update $index/$estype:$eventId") + false + } + } catch { + case e: IOException => + logger.error(s"Failed to update $index/$estype:$eventId", e) + false + } + } + } + } finally { + restClient.close() + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala index 5c9e170..c067f3a 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala @@ -15,50 +15,57 @@ * limitations under the License. */ - package org.apache.predictionio.data.storage.elasticsearch -import grizzled.slf4j.Logging +import java.io.IOException + +import scala.collection.JavaConverters._ + +import org.apache.http.Header +import org.apache.http.entity.ContentType +import org.apache.http.nio.entity.NStringEntity +import org.apache.http.util.EntityUtils import org.apache.predictionio.data.storage.StorageClientConfig -import org.elasticsearch.ElasticsearchException -import org.elasticsearch.client.Client -import org.json4s.JsonDSL._ +import org.apache.predictionio.data.storage.StorageClientException +import org.elasticsearch.client.RestClient import org.json4s._ +import org.json4s.JsonDSL._ import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.write + +import grizzled.slf4j.Logging -class ESSequences(client: Client, config: StorageClientConfig, index: String) extends Logging { +class ESSequences(client: RestClient, config: StorageClientConfig, index: String) extends Logging { implicit val formats = DefaultFormats private val estype = "sequences" - val indices = client.admin.indices - val indexExistResponse = indices.prepareExists(index).get - if (!indexExistResponse.isExists) { - // val settingsJson = - // ("number_of_shards" -> 1) ~ - // ("auto_expand_replicas" -> "0-all") - indices.prepareCreate(index).get - } - val typeExistResponse = indices.prepareTypesExists(index).setTypes(estype).get - if (!typeExistResponse.isExists) { - val mappingJson = - (estype -> - ("_source" -> ("enabled" -> 0)) ~ - ("_all" -> ("enabled" -> 0)) ~ - ("_type" -> ("index" -> "no")) ~ - ("enabled" -> 0)) - indices.preparePutMapping(index).setType(estype). - setSource(compact(render(mappingJson))).get - } + ESUtils.createIndex(client, index) + val mappingJson = + (estype -> + ("_all" -> ("enabled" -> 0))) + ESUtils.createMapping(client, index, estype, compact(render(mappingJson))) def genNext(name: String): Int = { try { - val response = client.prepareIndex(index, estype, name). - setSource(compact(render("n" -> name))).get - response.getVersion().toInt + val entity = new NStringEntity(write("n" -> name), ContentType.APPLICATION_JSON) + val response = client.performRequest( + "POST", + s"/$index/$estype/$name", + Map.empty[String, String].asJava, + entity) + val jsonResponse = parse(EntityUtils.toString(response.getEntity)) + val result = (jsonResponse \ "result").extract[String] + result match { + case "created" => + (jsonResponse \ "_version").extract[Int] + case "updated" => + (jsonResponse \ "_version").extract[Int] + case _ => + throw new IllegalStateException(s"[$result] Failed to update $index/$estype/$name") + } } catch { - case e: ElasticsearchException => - error(e.getMessage) - 0 + case e: IOException => + throw new StorageClientException(s"Failed to update $index/$estype/$name", e) } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala index f5c99bf..68e3f57 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala @@ -15,34 +15,151 @@ * limitations under the License. */ - package org.apache.predictionio.data.storage.elasticsearch -import org.elasticsearch.action.search.SearchRequestBuilder -import org.elasticsearch.client.Client -import org.elasticsearch.common.unit.TimeValue -import org.json4s.Formats -import org.json4s.native.Serialization.read +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.apache.http.nio.entity.NStringEntity +import org.elasticsearch.client.RestClient +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods._ +import org.json4s.native.Serialization.read +import org.apache.http.util.EntityUtils +import org.joda.time.DateTime +import org.joda.time.format.DateTimeFormat +import org.joda.time.DateTimeZone +import org.apache.predictionio.data.storage.StorageClientConfig +import org.apache.http.HttpHost object ESUtils { - val scrollLife = new TimeValue(60000) + val scrollLife = "1m" - def getAll[T : Manifest]( - client: Client, - builder: SearchRequestBuilder)( + def getAll[T: Manifest]( + client: RestClient, + index: String, + estype: String, + query: String)( implicit formats: Formats): Seq[T] = { - val results = ArrayBuffer[T]() - var response = builder.setScroll(scrollLife).get - var hits = response.getHits().hits() - results ++= hits.map(h => read[T](h.getSourceAsString)) - while (hits.size > 0) { - response = client.prepareSearchScroll(response.getScrollId). - setScroll(scrollLife).get - hits = response.getHits().hits() - results ++= hits.map(h => read[T](h.getSourceAsString)) + + @scala.annotation.tailrec + def scroll(scrollId: String, hits: Seq[JValue], results: Seq[T]): Seq[T] = { + if (hits.isEmpty) results + else { + val json = ("scroll" -> scrollLife) ~ ("scroll_id" -> scrollId) + val scrollBody = new StringEntity(compact(render(json))) + val response = client.performRequest( + "POST", + "/_search/scroll", + Map[String, String](), + scrollBody) + val responseJValue = parse(EntityUtils.toString(response.getEntity)) + scroll((responseJValue \ "_scroll_id").extract[String], + (responseJValue \ "hits" \ "hits").extract[Seq[JValue]], + hits.map(h => (h \ "_source").extract[T]) ++ results) + } } - results + + val response = client.performRequest( + "POST", + s"/$index/$estype/_search", + Map("scroll" -> scrollLife), + new StringEntity(query)) + val responseJValue = parse(EntityUtils.toString(response.getEntity)) + scroll((responseJValue \ "_scroll_id").extract[String], + (responseJValue \ "hits" \ "hits").extract[Seq[JValue]], + Nil) + } + + def createIndex( + client: RestClient, + index: String): Unit = { + client.performRequest( + "HEAD", + s"/$index", + Map.empty[String, String].asJava).getStatusLine.getStatusCode match { + case 404 => + client.performRequest( + "PUT", + s"/$index", + Map.empty[String, String].asJava) + case 200 => + case _ => + throw new IllegalStateException(s"/$index is invalid.") + } + } + + def createMapping( + client: RestClient, + index: String, + estype: String, + json: String): Unit = { + client.performRequest( + "HEAD", + s"/$index/_mapping/$estype", + Map.empty[String, String].asJava).getStatusLine.getStatusCode match { + case 404 => + val entity = new NStringEntity(json, ContentType.APPLICATION_JSON) + client.performRequest( + "PUT", + s"/$index/_mapping/$estype", + Map.empty[String, String].asJava, + entity) + case 200 => + case _ => + throw new IllegalStateException(s"/$index/$estype is invalid: $json") + } + } + + def createEventQuery( + startTime: Option[DateTime] = None, + untilTime: Option[DateTime] = None, + entityType: Option[String] = None, + entityId: Option[String] = None, + eventNames: Option[Seq[String]] = None, + targetEntityType: Option[Option[String]] = None, + targetEntityId: Option[Option[String]] = None, + reversed: Option[Boolean] = None): String = { + val mustQueries = Seq( + startTime.map(x => { + val v = DateTimeFormat + .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC)) + s"""{"range":{"eventTime":{"gte":"${v}"}}}""" + }), + untilTime.map(x => { + val v = DateTimeFormat + .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC)) + s"""{"range":{"eventTime":{"gte":"${v}"}}}""" + }), + entityType.map(x => s"""{"term":{"entityType":"${x}"}}"""), + entityId.map(x => s"""{"term":{"entityId":"${x}"}}"""), + targetEntityType.flatMap(xx => xx.map(x => s"""{"term":{"targetEntityType":"${x}"}}""")), + targetEntityId.flatMap(xx => xx.map(x => s"""{"term":{"targetEntityId":"${x}"}}""")), + eventNames + .map { xx => xx.map(x => "\"%s\"".format(x)) } + .map(x => s"""{"terms":{"event":[${x.mkString(",")}]}}""")).flatten.mkString(",") + val sortOrder = reversed.map(x => x match { + case true => "desc" + case _ => "asc" + }) + s"""{ + |"query":{"bool":{"must":[${mustQueries}]}}, + |"sort":[{"eventTime":{"order":"${sortOrder}"}}] + |}""".stripMargin + } + + def createRestClient(config: StorageClientConfig): RestClient = { + val hosts = config.properties.get("HOSTS"). + map(_.split(",").toSeq).getOrElse(Seq("localhost")) + val ports = config.properties.get("PORTS"). + map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9200)) + val schemes = config.properties.get("SCHEMES"). + map(_.split(",").toSeq).getOrElse(Seq("http")) + val httpHosts = (hosts, ports, schemes).zipped.map( + (h, p, s) => new HttpHost(h, p, s)) + RestClient.builder(httpHosts: _*).build() } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala index 75ac2b0..912d467 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala @@ -15,36 +15,24 @@ * limitations under the License. */ - package org.apache.predictionio.data.storage.elasticsearch import grizzled.slf4j.Logging import org.apache.predictionio.data.storage.BaseStorageClient import org.apache.predictionio.data.storage.StorageClientConfig import org.apache.predictionio.data.storage.StorageClientException -import org.elasticsearch.client.transport.TransportClient -import org.elasticsearch.common.settings.ImmutableSettings -import org.elasticsearch.common.transport.InetSocketTransportAddress -import org.elasticsearch.transport.ConnectTransportException +import java.net.InetAddress +import org.elasticsearch.client.RestClient +import org.apache.http.HttpHost class StorageClient(val config: StorageClientConfig) extends BaseStorageClient with Logging { override val prefix = "ES" + val client = try { - val hosts = config.properties.get("HOSTS"). - map(_.split(",").toSeq).getOrElse(Seq("localhost")) - val ports = config.properties.get("PORTS"). - map(_.split(",").toSeq.map(_.toInt)).getOrElse(Seq(9300)) - val settings = ImmutableSettings.settingsBuilder() - .put("cluster.name", config.properties.getOrElse("CLUSTERNAME", "elasticsearch")) - val transportClient = new TransportClient(settings) - (hosts zip ports) foreach { hp => - transportClient.addTransportAddress( - new InetSocketTransportAddress(hp._1, hp._2)) - } - transportClient + ESUtils.createRestClient(config) } catch { - case e: ConnectTransportException => + case e: Throwable => throw new StorageClientException(e.getMessage, e) } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/36b79d7d/tools/build.sbt ---------------------------------------------------------------------- diff --git a/tools/build.sbt b/tools/build.sbt index fefdb45..4e2b266 100644 --- a/tools/build.sbt +++ b/tools/build.sbt @@ -42,6 +42,7 @@ dependencyOverrides += "org.slf4j" % "slf4j-log4j12" % "1.7.18" assemblyMergeStrategy in assembly := { case PathList("META-INF", "LICENSE.txt") => MergeStrategy.concat case PathList("META-INF", "NOTICE.txt") => MergeStrategy.concat + case PathList("org", "joda", "time", "base", "BaseDateTime.class") => MergeStrategy.first case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) @@ -62,7 +63,8 @@ excludedJars in assembly <<= (fullClasspath in assembly) map { cp => assemblyShadeRules in assembly := Seq( ShadeRule.rename("org.objenesis.**" -> "shadeio.@1").inLibrary("com.esotericsoftware.kryo" % "kryo" % "2.21").inProject, ShadeRule.rename("com.esotericsoftware.reflectasm.**" -> "shadeio.@1").inLibrary("com.esotericsoftware.kryo" % "kryo" % "2.21").inProject, - ShadeRule.rename("com.esotericsoftware.minlog.**" -> "shadeio.@1").inLibrary("com.esotericsoftware.kryo" % "kryo" % "2.21").inProject + ShadeRule.rename("com.esotericsoftware.minlog.**" -> "shadeio.@1").inLibrary("com.esotericsoftware.kryo" % "kryo" % "2.21").inProject, + ShadeRule.rename("org.apache.http.**" -> "shadeio.http.@1").inAll ) // skip test in assembly
