Repository: incubator-predictionio Updated Branches: refs/heads/develop a36fbacae -> 4b172f57e
[PIO-90] Improve /batch/events.json endpoint performance Closes #386 Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/4b172f57 Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/4b172f57 Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/4b172f57 Branch: refs/heads/develop Commit: 4b172f57e03d2fe022a9d21100b233b1703d7758 Parents: a36fbac Author: Naoki Takezoe <[email protected]> Authored: Mon Jun 12 22:27:57 2017 +0900 Committer: Shinsuke Sugaya <[email protected]> Committed: Mon Jun 12 22:27:57 2017 +0900 ---------------------------------------------------------------------- .../predictionio/data/api/EventServer.scala | 96 ++++++++++++-------- .../predictionio/data/storage/LEvents.scala | 21 +++++ .../data/storage/elasticsearch/ESLEvents.scala | 76 ++++++++++++++-- .../data/storage/hbase/HBLEvents.scala | 14 +++ .../data/storage/jdbc/JDBCLEvents.scala | 59 ++++++++++-- 5 files changed, 215 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4b172f57/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala index b4392ff..75c2227 100644 --- a/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala +++ b/data/src/main/scala/org/apache/predictionio/data/api/EventServer.scala @@ -384,50 +384,70 @@ class EventServiceActor( val appId = authData.appId val channelId = authData.channelId val allowedEvents = authData.events - val handleEvent: PartialFunction[Try[Event], Future[Map[String, Any]]] = { - case Success(event) => { - if (allowedEvents.isEmpty || allowedEvents.contains(event.event)) { - pluginContext.inputBlockers.values.foreach( - _.process(EventInfo( - appId = appId, - channelId = channelId, - event = event), pluginContext)) - val data = eventClient.futureInsert(event, appId, channelId).map { id => - pluginsActorRef ! EventInfo( - appId = appId, - channelId = channelId, - event = event) - val status = StatusCodes.Created - val result = Map( - "status" -> status.intValue, - "eventId" -> s"${id}") - if (config.stats) { - statsActorRef ! Bookkeeping(appId, status, event) + + entity(as[Seq[Try[Event]]]) { events => + complete { + if (events.length <= MaxNumberOfEventsPerBatchRequest) { + val eventWithIndex = events.zipWithIndex + + val taggedEvents = eventWithIndex.collect { case (Success(event), i) => + if(allowedEvents.isEmpty || allowedEvents.contains(event.event)){ + (Right(event), i) + } else { + (Left(event), i) } - result - }.recover { case exception => + } + + val insertEvents = taggedEvents.collect { case (Right(event), i) => + (event, i) + } + + insertEvents.foreach { case (event, i) => + pluginContext.inputBlockers.values.foreach( + _.process(EventInfo( + appId = appId, + channelId = channelId, + event = event), pluginContext)) + } + + val f: Future[Seq[Map[String, Any]]] = eventClient.futureInsertBatch( + insertEvents.map(_._1), appId, channelId).map { insertResults => + val results = insertResults.zip(insertEvents).map { case (id, (event, i)) => + pluginsActorRef ! EventInfo( + appId = appId, + channelId = channelId, + event = event) + val status = StatusCodes.Created + if (config.stats) { + statsActorRef ! Bookkeeping(appId, status, event) + } + (Map( + "status" -> status.intValue, + "eventId" -> s"${id}"), i) + } ++ + // Results of denied events + taggedEvents.collect { case (Left(event), i) => + (Map( + "status" -> StatusCodes.Forbidden.intValue, + "message" -> s"${event.event} events are not allowed"), i) + } ++ + // Results of failed to deserialze events + eventWithIndex.collect { case (Failure(exception), i) => + (Map( + "status" -> StatusCodes.BadRequest.intValue, + "message" -> s"${exception.getMessage()}"), i) + } + + // Restore original order + results.sortBy { case (_, i) => i }.map { case (data, _) => data } + } + + f.recover { case exception => Map( "status" -> StatusCodes.InternalServerError.intValue, "message" -> s"${exception.getMessage()}") } - data - } else { - Future.successful(Map( - "status" -> StatusCodes.Forbidden.intValue, - "message" -> s"${event.event} events are not allowed")) - } - } - case Failure(exception) => { - Future.successful(Map( - "status" -> StatusCodes.BadRequest.intValue, - "message" -> s"${exception.getMessage()}")) - } - } - entity(as[Seq[Try[Event]]]) { events => - complete { - if (events.length <= MaxNumberOfEventsPerBatchRequest) { - Future.traverse(events)(handleEvent) } else { (StatusCodes.BadRequest, Map("message" -> (s"Batch request must have less than or equal to " + http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4b172f57/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala ---------------------------------------------------------------------- diff --git a/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala b/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala index f7a980c..65b8779 100644 --- a/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala +++ b/data/src/main/scala/org/apache/predictionio/data/storage/LEvents.scala @@ -91,6 +91,27 @@ trait LEvents { event: Event, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] /** :: DeveloperApi :: + * Insert [[Event]]s in a non-blocking fashion. + * + * Default implementation of this method is calling + * [[LEvents.futureInsert(Event, Int, Option[Int])]] per event. + * Override in the storage implementation if the storage has + * a better way to insert multiple data at once. + * + * @param events [[Event]]s to be inserted + * @param appId App ID for the [[Event]]s to be inserted to + * @param channelId Optional channel ID for the [[Event]]s to be inserted to + */ + @DeveloperApi + def futureInsertBatch(events: Seq[Event], appId: Int, channelId: Option[Int]) + (implicit ec: ExecutionContext): Future[Seq[String]] = { + val seq = events.map { event => + futureInsert(event, appId, channelId) + } + Future.sequence(seq) + } + + /** :: DeveloperApi :: * Get an [[Event]] in a non-blocking fashion. * * @param eventId ID of the [[Event]] http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4b172f57/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala index 6cf7a9a..6240059 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala @@ -22,8 +22,7 @@ import java.io.IOException import scala.collection.JavaConverters._ import scala.concurrent.ExecutionContext import scala.concurrent.Future - -import org.apache.http.entity.ContentType +import org.apache.http.entity.{ContentType, StringEntity} import org.apache.http.nio.entity.NStringEntity import org.apache.http.util.EntityUtils import org.apache.predictionio.data.storage.Event @@ -34,13 +33,11 @@ import org.joda.time.DateTime import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.native.JsonMethods._ -import org.json4s.native.Serialization.read import org.json4s.native.Serialization.write import org.json4s.ext.JodaTimeSerializers - import grizzled.slf4j.Logging import org.elasticsearch.client.ResponseException -import org.apache.http.entity.StringEntity +import org.apache.http.message.BasicHeader class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: String) extends LEvents with Logging { @@ -130,7 +127,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St ("prId" -> event.prId) ~ ("creationTime" -> ESUtils.formatUTCDateTime(event.creationTime)) ~ ("properties" -> write(event.properties.toJObject)) - val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON); + val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) val response = restClient.performRequest( "POST", s"/$index/$estype/$id", @@ -153,6 +150,73 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St } } + override def futureInsertBatch( + events: Seq[Event], + appId: Int, + channelId: Option[Int])(implicit ec: ExecutionContext): Future[Seq[String]] = { + Future { + val estype = getEsType(appId, channelId) + try { + val ids = events.map { event => + event.eventId.getOrElse(ESEventsUtil.getBase64UUID) + } + + val json = events.zip(ids).map { case (event, id) => + val commandJson = + ("index" -> ( + ("_index" -> index) ~ + ("_type" -> estype) ~ + ("_id" -> id) + )) + + val documentJson = + ("eventId" -> id) ~ + ("event" -> event.event) ~ + ("entityType" -> event.entityType) ~ + ("entityId" -> event.entityId) ~ + ("targetEntityType" -> event.targetEntityType) ~ + ("targetEntityId" -> event.targetEntityId) ~ + ("eventTime" -> ESUtils.formatUTCDateTime(event.eventTime)) ~ + ("tags" -> event.tags) ~ + ("prId" -> event.prId) ~ + ("creationTime" -> ESUtils.formatUTCDateTime(event.creationTime)) ~ + ("properties" -> write(event.properties.toJObject)) + + compact(render(commandJson)) + "\n" + compact(render(documentJson)) + + }.mkString("", "\n", "\n") + + val entity = new StringEntity(json) + val response = restClient.performRequest( + "POST", + "/_bulk", + Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava, + entity, + new BasicHeader("Content-Type", "application/x-ndjson")) + + val responseJson = parse(EntityUtils.toString(response.getEntity)) + val items = (responseJson \ "items").asInstanceOf[JArray] + + items.arr.map { case value: JObject => + val result = (value \ "index" \ "result").extract[String] + val id = (value \ "index" \ "_id").extract[String] + + result match { + case "created" => id + case "updated" => id + case _ => + error(s"[$result] Failed to update $index/$estype/$id") + "" + } + } + } catch { + case e: IOException => + error(s"Failed to update $index/$estype/<id>", e) + Nil + } + } + } + private def exists(restClient: RestClient, estype: String, id: Int): Boolean = { try { restClient.performRequest( http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4b172f57/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala ---------------------------------------------------------------------- diff --git a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala index 360b007..e95e7e8 100644 --- a/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala +++ b/storage/hbase/src/main/scala/org/apache/predictionio/data/storage/hbase/HBLEvents.scala @@ -110,6 +110,20 @@ class HBLEvents(val client: HBClient, config: StorageClientConfig, val namespace } override + def futureInsertBatch( + events: Seq[Event], appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): + Future[Seq[String]] = { + Future { + val table = getTable(appId, channelId) + val (puts, rowKeys) = events.map { event => HBEventsUtil.eventToPut(event, appId) }.unzip + table.put(puts) + table.flushCommits() + table.close() + rowKeys.map(_.toString) + } + } + + override def futureGet( eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = { http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/4b172f57/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala ---------------------------------------------------------------------- diff --git a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala index dddef67..b4230cc 100644 --- a/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala +++ b/storage/jdbc/src/main/scala/org/apache/predictionio/data/storage/jdbc/JDBCLEvents.scala @@ -40,7 +40,7 @@ class JDBCLEvents( namespace: String) extends LEvents with Logging { implicit private val formats = org.json4s.DefaultFormats - def init(appId: Int, channelId: Option[Int] = None): Boolean = { + override def init(appId: Int, channelId: Option[Int] = None): Boolean = { // To use index, it must be varchar less than 255 characters on a VARCHAR column val useIndex = config.properties.contains("INDEX") && @@ -91,7 +91,7 @@ class JDBCLEvents( } } - def remove(appId: Int, channelId: Option[Int] = None): Boolean = + override def remove(appId: Int, channelId: Option[Int] = None): Boolean = DB autoCommit { implicit session => SQL(s""" drop table ${JDBCUtils.eventTableName(namespace, appId, channelId)} @@ -99,9 +99,9 @@ class JDBCLEvents( true } - def close(): Unit = ConnectionPool.closeAll() + override def close(): Unit = ConnectionPool.closeAll() - def futureInsert(event: Event, appId: Int, channelId: Option[Int])( + override def futureInsert(event: Event, appId: Int, channelId: Option[Int])( implicit ec: ExecutionContext): Future[String] = Future { DB localTx { implicit session => val id = event.eventId.getOrElse(JDBCUtils.generateId) @@ -127,7 +127,52 @@ class JDBCLEvents( } } - def futureGet(eventId: String, appId: Int, channelId: Option[Int])( + override def futureInsertBatch(events: Seq[Event], appId: Int, channelId: Option[Int])( + implicit ec: ExecutionContext): Future[Seq[String]] = Future { + DB localTx { implicit session => + val ids = events.map(_.eventId.getOrElse(JDBCUtils.generateId)) + val params = events.zip(ids).map { case (event, id) => + Seq( + 'id -> id, + 'event -> event.event, + 'entityType -> event.entityType, + 'entityId -> event.entityId, + 'targetEntityType -> event.targetEntityType, + 'targetEntityId -> event.targetEntityId, + 'properties -> write(event.properties.toJObject), + 'eventTime -> event.eventTime, + 'eventTimeZone -> event.eventTime.getZone.getID, + 'tags -> (if(event.tags.nonEmpty) Some(event.tags.mkString(",")) else None), + 'prId -> event.prId, + 'creationTime -> event.creationTime, + 'creationTimeZone -> event.creationTime.getZone.getID + ) + } + + val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) + sql""" + insert into $tableName values( + {id}, + {event}, + {entityType}, + {entityId}, + {targetEntityType}, + {targetEntityId}, + {properties}, + {eventTime}, + {eventTimeZone}, + {tags}, + {prId}, + {creationTime}, + {creationTimeZone} + ) + """.batchByName(params: _*).apply() + + ids + } + } + + override def futureGet(eventId: String, appId: Int, channelId: Option[Int])( implicit ec: ExecutionContext): Future[Option[Event]] = Future { DB readOnly { implicit session => val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) @@ -152,7 +197,7 @@ class JDBCLEvents( } } - def futureDelete(eventId: String, appId: Int, channelId: Option[Int])( + override def futureDelete(eventId: String, appId: Int, channelId: Option[Int])( implicit ec: ExecutionContext): Future[Boolean] = Future { DB localTx { implicit session => val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId)) @@ -163,7 +208,7 @@ class JDBCLEvents( } } - def futureFind( + override def futureFind( appId: Int, channelId: Option[Int] = None, startTime: Option[DateTime] = None,
