Repository: incubator-predictionio Updated Branches: refs/heads/develop 463939348 -> ae51040ba
Update event serialization for ES5 Closes #358 Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/ae51040b Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/ae51040b Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/ae51040b Branch: refs/heads/develop Commit: ae51040baaebd30608198841b7d4caee2c4c5cd0 Parents: 4639393 Author: Shinsuke Sugaya <[email protected]> Authored: Tue Mar 14 15:08:33 2017 -0700 Committer: Donald Szeto <[email protected]> Committed: Tue Mar 14 15:08:33 2017 -0700 ---------------------------------------------------------------------- .../storage/elasticsearch/ESEventsUtil.scala | 44 ++------ .../data/storage/elasticsearch/ESLEvents.scala | 35 ++++--- .../data/storage/elasticsearch/ESUtils.scala | 103 +++++++++++++++++-- 3 files changed, 121 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/ae51040b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala index 56f47ab..2edbc35 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala @@ -18,16 +18,14 @@ package org.apache.predictionio.data.storage.elasticsearch -import org.apache.hadoop.io.DoubleWritable -import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.MapWritable import org.apache.hadoop.io.Text import org.apache.predictionio.data.storage.DataMap import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.EventValidation import org.joda.time.DateTime -import org.joda.time.DateTimeZone import org.json4s._ +import org.json4s.native.Serialization.read +import org.json4s.native.Serialization.write object ESEventsUtil { @@ -53,23 +51,8 @@ object ESEventsUtil { } } - val tmp = result - .get(new Text("properties")).asInstanceOf[MapWritable] - .get(new Text("fields")).asInstanceOf[MapWritable] - .get(new Text("rating")) - - val rating = - if (tmp.isInstanceOf[DoubleWritable]) tmp.asInstanceOf[DoubleWritable] - else if (tmp.isInstanceOf[LongWritable]) { - new DoubleWritable(tmp.asInstanceOf[LongWritable].get().toDouble) - } - else null - - val properties: DataMap = - if (rating != null) DataMap(s"""{"rating":${rating.get().toString}}""") - else DataMap() - - + val properties: DataMap = getOptStringCol("properties") + .map(s => DataMap(read[JObject](s))).getOrElse(DataMap()) val eventId = Some(getStringCol("eventId")) val event = getStringCol("event") val entityType = getStringCol("entityType") @@ -77,17 +60,8 @@ object ESEventsUtil { val targetEntityType = getOptStringCol("targetEntityType") val targetEntityId = getOptStringCol("targetEntityId") val prId = getOptStringCol("prId") - val eventTimeZone = getOptStringCol("eventTimeZone") - .map(DateTimeZone.forID(_)) - .getOrElse(EventValidation.defaultTimeZone) - val eventTime = new DateTime( - getStringCol("eventTime"), eventTimeZone) - val creationTimeZone = getOptStringCol("creationTimeZone") - .map(DateTimeZone.forID(_)) - .getOrElse(EventValidation.defaultTimeZone) - val creationTime: DateTime = new DateTime( - getStringCol("creationTime"), creationTimeZone) - + val eventTime: DateTime = ESUtils.parseUTCDateTime(getStringCol("eventTime")) + val creationTime: DateTime = ESUtils.parseUTCDateTime(getStringCol("creationTime")) Event( eventId = eventId, @@ -112,11 +86,11 @@ object ESEventsUtil { "entityId" -> event.entityId, "targetEntityType" -> event.targetEntityType, "targetEntityId" -> event.targetEntityId, - "properties" -> event.properties.toJObject, - "eventTime" -> event.eventTime.toString, + "properties" -> write(event.properties.toJObject), + "eventTime" -> ESUtils.formatUTCDateTime(event.eventTime), "tags" -> event.tags, "prId" -> event.prId, - "creationTime" -> event.creationTime.toString + "creationTime" -> ESUtils.formatUTCDateTime(event.creationTime) ) } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/ae51040b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala index fdd370a..809a064 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala @@ -72,17 +72,11 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St ("entityId" -> ("type" -> "keyword")) ~ ("targetEntityType" -> ("type" -> "keyword")) ~ ("targetEntityId" -> ("type" -> "keyword")) ~ - ("properties" -> - ("type" -> "nested") ~ - ("properties" -> - ("fields" -> ("type" -> "nested") ~ - ("properties" -> - ("user" -> ("type" -> "long")) ~ - ("num" -> ("type" -> "long")))))) ~ - ("eventTime" -> ("type" -> "date")) ~ - ("tags" -> ("type" -> "keyword")) ~ - ("prId" -> ("type" -> "keyword")) ~ - ("creationTime" -> ("type" -> "date")))) + ("properties" -> ("type" -> "keyword")) ~ + ("eventTime" -> ("type" -> "date")) ~ + ("tags" -> ("type" -> "keyword")) ~ + ("prId" -> ("type" -> "keyword")) ~ + ("creationTime" -> ("type" -> "date")))) ESUtils.createMapping(restClient, index, estype, compact(render(json))) } finally { restClient.close() @@ -134,8 +128,19 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St while (exists(restClient, estype, roll)) roll = seq.genNext(seqName) roll.toString } - val json = write(event.copy(eventId = Some(id))) - val entity = new NStringEntity(json, ContentType.APPLICATION_JSON); + val json = + ("eventId" -> id) ~ + ("event" -> event.event) ~ + ("entityType" -> event.entityType) ~ + ("entityId" -> event.entityId) ~ + ("targetEntityType" -> event.targetEntityType) ~ + ("targetEntityId" -> event.targetEntityId) ~ + ("eventTime" -> ESUtils.formatUTCDateTime(event.eventTime)) ~ + ("tags" -> event.tags) ~ + ("prId" -> event.prId) ~ + ("creationTime" -> ESUtils.formatUTCDateTime(event.creationTime)) ~ + ("properties" -> write(event.properties.toJObject)) + val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON); val response = restClient.performRequest( "POST", s"/$index/$estype/$id", @@ -275,8 +280,8 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St startTime, untilTime, entityType, entityId, eventNames, targetEntityType, targetEntityId, reversed) limit.getOrElse(20) match { - case -1 => ESUtils.getAll[Event](restClient, index, estype, query).toIterator - case size => ESUtils.get[Event](restClient, index, estype, query, size).toIterator + case -1 => ESUtils.getEventAll(restClient, index, estype, query).toIterator + case size => ESUtils.getEvents(restClient, index, estype, query, size).toIterator } } catch { case e: IOException => http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/ae51040b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala index 72f4dd6..4eb117e 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala @@ -34,17 +34,72 @@ import org.joda.time.format.DateTimeFormat import org.joda.time.DateTimeZone import org.apache.predictionio.data.storage.StorageClientConfig import org.apache.http.HttpHost +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.storage.DataMap object ESUtils { val scrollLife = "1m" - def get[T: Manifest]( + def toEvent(value: JValue)( + implicit formats: Formats): Event = { + def getString(s: String): String = { + (value \ s) match { + case x if x == JNothing => null + case x => x.extract[String] + } + } + + def getOptString(s: String): Option[String] = { + getString(s) match { + case null => None + case x => Some(x) + } + } + + val properties: DataMap = getOptString("properties") + .map(s => DataMap(read[JObject](s))).getOrElse(DataMap()) + val eventId = getOptString("eventId") + val event = getString("event") + val entityType = getString("entityType") + val entityId = getString("entityId") + val targetEntityType = getOptString("targetEntityType") + val targetEntityId = getOptString("targetEntityId") + val prId = getOptString("prId") + val eventTime: DateTime = ESUtils.parseUTCDateTime(getString("eventTime")) + val creationTime: DateTime = ESUtils.parseUTCDateTime(getString("creationTime")) + val tags = (value \ "tags").extract[Seq[String]] + + Event( + eventId = eventId, + event = event, + entityType = entityType, + entityId = entityId, + targetEntityType = targetEntityType, + targetEntityId = targetEntityId, + properties = properties, + eventTime = eventTime, + tags = tags, + prId = prId, + creationTime = creationTime) + } + + def getEvents( client: RestClient, index: String, estype: String, query: String, size: Int)( - implicit formats: Formats): Seq[T] = { + implicit formats: Formats): Seq[Event] = { + getDocList(client, index, estype, query, size).map(x => toEvent(x)) + } + + def getDocList( + client: RestClient, + index: String, + estype: String, + query: String, + size: Int)( + implicit formats: Formats): Seq[JValue] = { val response = client.performRequest( "POST", s"/$index/$estype/_search", @@ -52,7 +107,7 @@ object ESUtils { new StringEntity(query)) val responseJValue = parse(EntityUtils.toString(response.getEntity)) val hits = (responseJValue \ "hits" \ "hits").extract[Seq[JValue]] - hits.map(h => (h \ "_source").extract[T]) + hits.map(h => (h \ "_source")) } def getAll[T: Manifest]( @@ -61,9 +116,27 @@ object ESUtils { estype: String, query: String)( implicit formats: Formats): Seq[T] = { + getDocAll(client, index, estype, query).map(x => x.extract[T]) + } + + def getEventAll( + client: RestClient, + index: String, + estype: String, + query: String)( + implicit formats: Formats): Seq[Event] = { + getDocAll(client, index, estype, query).map(x => toEvent(x)) + } + + def getDocAll( + client: RestClient, + index: String, + estype: String, + query: String)( + implicit formats: Formats): Seq[JValue] = { @scala.annotation.tailrec - def scroll(scrollId: String, hits: Seq[JValue], results: Seq[T]): Seq[T] = { + def scroll(scrollId: String, hits: Seq[JValue], results: Seq[JValue]): Seq[JValue] = { if (hits.isEmpty) results else { val json = ("scroll" -> scrollLife) ~ ("scroll_id" -> scrollId) @@ -76,7 +149,7 @@ object ESUtils { val responseJValue = parse(EntityUtils.toString(response.getEntity)) scroll((responseJValue \ "_scroll_id").extract[String], (responseJValue \ "hits" \ "hits").extract[Seq[JValue]], - hits.map(h => (h \ "_source").extract[T]) ++ results) + hits.map(h => (h \ "_source").extract[JValue]) ++ results) } } @@ -87,8 +160,8 @@ object ESUtils { new StringEntity(query)) val responseJValue = parse(EntityUtils.toString(response.getEntity)) scroll((responseJValue \ "_scroll_id").extract[String], - (responseJValue \ "hits" \ "hits").extract[Seq[JValue]], - Nil) + (responseJValue \ "hits" \ "hits").extract[Seq[JValue]], + Nil) } def createIndex( @@ -131,6 +204,16 @@ object ESUtils { } } + def formatUTCDateTime(dt: DateTime): String = { + DateTimeFormat + .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(dt.withZone(DateTimeZone.UTC)) + } + + def parseUTCDateTime(str: String): DateTime = { + DateTimeFormat + .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").parseDateTime(str) + } + def createEventQuery( startTime: Option[DateTime] = None, untilTime: Option[DateTime] = None, @@ -142,13 +225,11 @@ object ESUtils { reversed: Option[Boolean] = None): String = { val mustQueries = Seq( startTime.map(x => { - val v = DateTimeFormat - .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC)) + val v = formatUTCDateTime(x) s"""{"range":{"eventTime":{"gte":"${v}"}}}""" }), untilTime.map(x => { - val v = DateTimeFormat - .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ").print(x.withZone(DateTimeZone.UTC)) + val v = formatUTCDateTime(x) s"""{"range":{"eventTime":{"lt":"${v}"}}}""" }), entityType.map(x => s"""{"term":{"entityType":"${x}"}}"""),
