This is an automated email from the ASF dual-hosted git repository. emergentorder pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/predictionio.git
The following commit(s) were added to refs/heads/develop by this push: new d71c423 [PIO-189] fix ES6 integration test (#488) d71c423 is described below commit d71c4230ae598e1647136c1bff1d646a79874cdf Author: Alex Merritt <leca...@gmail.com> AuthorDate: Thu Oct 25 12:20:24 2018 -0500 [PIO-189] fix ES6 integration test (#488) * [PIO-189] fix ES6 integration test --- .../predictionio/data/storage/elasticsearch/ESLEvents.scala | 10 +++++++++- .../predictionio/data/storage/elasticsearch/ESPEvents.scala | 5 ++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala index 185be92..f275ec9 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala @@ -38,7 +38,7 @@ import org.json4s.ext.JodaTimeSerializers import grizzled.slf4j.Logging import org.apache.http.message.BasicHeader -class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: String) +class ESLEvents(val client: RestClient, config: StorageClientConfig, val baseIndex: String) extends LEvents with Logging { implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all @@ -52,6 +52,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: override def init(appId: Int, channelId: Option[Int] = None): Boolean = { val estype = getEsType(appId, channelId) + val index = baseIndex + "_" + estype ESUtils.createIndex(client, index, ESUtils.getNumberOfShards(config, index.toUpperCase), ESUtils.getNumberOfReplicas(config, index.toUpperCase)) @@ -77,6 +78,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: override def remove(appId: Int, channelId: Option[Int] = None): Boolean = { val estype = getEsType(appId, channelId) + val index = baseIndex + "_" + estype try { val json = ("query" -> @@ -107,6 +109,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = { Future { val estype = getEsType(appId, channelId) + val index = baseIndex + "_" + estype try { val id = event.eventId.getOrElse { ESEventsUtil.getBase64UUID @@ -152,6 +155,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: channelId: Option[Int])(implicit ec: ExecutionContext): Future[Seq[String]] = { Future { val estype = getEsType(appId, channelId) + val index = baseIndex + "_" + estype try { val ids = events.map { event => event.eventId.getOrElse(ESEventsUtil.getBase64UUID) @@ -214,6 +218,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: } private def exists(client: RestClient, estype: String, id: Int): Boolean = { + val index = baseIndex + "_" + estype try { client.performRequest( "GET", @@ -242,6 +247,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = { Future { val estype = getEsType(appId, channelId) + val index = baseIndex + "_" + estype try { val json = ("query" -> @@ -275,6 +281,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = { Future { val estype = getEsType(appId, channelId) + val index = baseIndex + "_" + estype try { val json = ("query" -> @@ -311,6 +318,7 @@ class ESLEvents(val client: RestClient, config: StorageClientConfig, val index: (implicit ec: ExecutionContext): Future[Iterator[Event]] = { Future { val estype = getEsType(appId, channelId) + val index = baseIndex + "_" + estype try { val query = ESUtils.createEventQuery( startTime, untilTime, entityType, entityId, diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala index 75f7639..a86d378 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESPEvents.scala @@ -41,7 +41,7 @@ import org.json4s.native.JsonMethods._ import org.json4s.ext.JodaTimeSerializers -class ESPEvents(client: RestClient, config: StorageClientConfig, index: String) +class ESPEvents(client: RestClient, config: StorageClientConfig, baseIndex: String) extends PEvents { implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all @@ -78,6 +78,7 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, index: String) eventNames, targetEntityType, targetEntityId, None) val estype = getEsType(appId, channelId) + val index = baseIndex + "_" + estype val conf = new Configuration() conf.set("es.resource", s"$index/$estype") conf.set("es.query", query) @@ -97,6 +98,7 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, index: String) events: RDD[Event], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { val estype = getEsType(appId, channelId) + val index = baseIndex + "_" + estype val conf = Map("es.resource" -> s"$index/$estype", "es.nodes" -> getESNodes()) events.map { event => ESEventsUtil.eventToPut(event, appId) @@ -107,6 +109,7 @@ class ESPEvents(client: RestClient, config: StorageClientConfig, index: String) eventIds: RDD[String], appId: Int, channelId: Option[Int])(sc: SparkContext): Unit = { val estype = getEsType(appId, channelId) + val index = baseIndex + "_" + estype eventIds.foreachPartition { iter => iter.foreach { eventId => try {