Repository: incubator-predictionio Updated Branches: refs/heads/develop ada0591f2 -> 945139be0
ES storage improvement/refactoring - Use the same ID generation for ES in event data - Reuse RestClient instance in ESLEvents - Replace 0 with false in mapping - Set Content-Type as application/json Closes #374 Project: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/commit/945139be Tree: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/tree/945139be Diff: http://git-wip-us.apache.org/repos/asf/incubator-predictionio/diff/945139be Branch: refs/heads/develop Commit: 945139be0a68e0733a3ab91d4ea4e4bfbde97c6f Parents: ada0591 Author: Shinsuke Sugaya <[email protected]> Authored: Wed May 3 09:06:05 2017 -0700 Committer: Donald Szeto <[email protected]> Committed: Wed May 3 09:06:05 2017 -0700 ---------------------------------------------------------------------- .../storage/elasticsearch/ESAccessKeys.scala | 2 +- .../data/storage/elasticsearch/ESApps.scala | 20 +++-- .../data/storage/elasticsearch/ESChannels.scala | 20 +++-- .../elasticsearch/ESEngineInstances.scala | 2 +- .../elasticsearch/ESEvaluationInstances.scala | 24 ++--- .../storage/elasticsearch/ESEventsUtil.scala | 95 +++++++++++++++++++- .../data/storage/elasticsearch/ESLEvents.scala | 69 +++++--------- .../storage/elasticsearch/ESSequences.scala | 10 +-- .../data/storage/elasticsearch/ESUtils.scala | 8 +- 9 files changed, 169 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala index 9278366..98c2781 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESAccessKeys.scala @@ -49,7 +49,7 @@ class ESAccessKeys(client: ESClient, config: StorageClientConfig, index: String) ESUtils.getNumberOfReplicas(config, index.toUpperCase)) val mappingJson = (estype -> - ("_all" -> ("enabled" -> 0)) ~ + ("_all" -> ("enabled" -> false)) ~ ("properties" -> ("key" -> ("type" -> "keyword")) ~ ("events" -> ("type" -> "keyword")))) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala index e7fe4af..0b319ab 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala @@ -50,7 +50,7 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String) ESUtils.getNumberOfReplicas(config, index.toUpperCase)) val mappingJson = (estype -> - ("_all" -> ("enabled" -> 0)) ~ + ("_all" -> ("enabled" -> false)) ~ ("properties" -> ("id" -> ("type" -> "keyword")) ~ ("name" -> ("type" -> "keyword")))) @@ -60,12 +60,18 @@ class ESApps(client: ESClient, config: StorageClientConfig, index: String) } def insert(app: App): Option[Int] = { - val id = - if (app.id == 0) { - var roll = seq.genNext(estype) - while (!get(roll).isEmpty) roll = seq.genNext(estype) - roll - } else app.id + val id = app.id match { + case v if v == 0 => + @scala.annotation.tailrec + def generateId: Int = { + seq.genNext(estype).toInt match { + case x if !get(x).isEmpty => generateId + case x => x + } + } + generateId + case v => v + } update(app.copy(id = id)) Some(id) } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala index a173c59..c142beb 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala @@ -49,7 +49,7 @@ class ESChannels(client: ESClient, config: StorageClientConfig, index: String) ESUtils.getNumberOfReplicas(config, index.toUpperCase)) val mappingJson = (estype -> - ("_all" -> ("enabled" -> 0)) ~ + ("_all" -> ("enabled" -> false)) ~ ("properties" -> ("name" -> ("type" -> "keyword")))) ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) @@ -58,12 +58,18 @@ class ESChannels(client: ESClient, config: StorageClientConfig, index: String) } def insert(channel: Channel): Option[Int] = { - val id = - if (channel.id == 0) { - var roll = seq.genNext(estype) - while (!get(roll).isEmpty) roll = seq.genNext(estype) - roll - } else channel.id + val id = channel.id match { + case v if v == 0 => + @scala.annotation.tailrec + def generateId: Int = { + seq.genNext(estype).toInt match { + case x if !get(x).isEmpty => generateId + case x => x + } + } + generateId + case v => v + } if (update(channel.copy(id = id))) Some(id) else None } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala index e123744..de474c1 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala @@ -49,7 +49,7 @@ class ESEngineInstances(client: ESClient, config: StorageClientConfig, index: St ESUtils.getNumberOfReplicas(config, index.toUpperCase)) val mappingJson = (estype -> - ("_all" -> ("enabled" -> 0)) ~ + ("_all" -> ("enabled" -> false)) ~ ("properties" -> ("status" -> ("type" -> "keyword")) ~ ("startTime" -> ("type" -> "date")) ~ http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala index 48f191a..9b19cf4 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEvaluationInstances.scala @@ -51,7 +51,7 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index ESUtils.getNumberOfReplicas(config, index.toUpperCase)) val mappingJson = (estype -> - ("_all" -> ("enabled" -> 0)) ~ + ("_all" -> ("enabled" -> false)) ~ ("properties" -> ("status" -> ("type" -> "keyword")) ~ ("startTime" -> ("type" -> "date")) ~ @@ -59,9 +59,9 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index ("evaluationClass" -> ("type" -> "keyword")) ~ ("engineParamsGeneratorClass" -> ("type" -> "keyword")) ~ ("batch" -> ("type" -> "keyword")) ~ - ("evaluatorResults" -> ("type" -> "text") ~ ("index" -> "no")) ~ - ("evaluatorResultsHTML" -> ("type" -> "text") ~ ("index" -> "no")) ~ - ("evaluatorResultsJSON" -> ("type" -> "text") ~ ("index" -> "no")))) + ("evaluatorResults" -> ("type" -> "text")) ~ + ("evaluatorResultsHTML" -> ("enabled" -> false)) ~ + ("evaluatorResultsJSON" -> ("enabled" -> false)))) ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) } finally { restClient.close() @@ -69,13 +69,17 @@ class ESEvaluationInstances(client: ESClient, config: StorageClientConfig, index def insert(i: EvaluationInstance): String = { val id = i.id match { - case x if x.isEmpty => - var roll = seq.genNext(estype).toString - while (!get(roll).isEmpty) roll = seq.genNext(estype).toString - roll - case x => x + case v if v.isEmpty => + @scala.annotation.tailrec + def generateId: String = { + seq.genNext(estype).toString match { + case x if !get(x).isEmpty => generateId + case x => x + } + } + generateId + case v => v } - update(i.copy(id = id)) id } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala index 2edbc35..ec72a49 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEventsUtil.scala @@ -18,6 +18,13 @@ package org.apache.predictionio.data.storage.elasticsearch +import java.net.NetworkInterface +import java.net.SocketException +import java.security.SecureRandom +import java.util.Base64 +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong + import org.apache.hadoop.io.MapWritable import org.apache.hadoop.io.Text import org.apache.predictionio.data.storage.DataMap @@ -27,6 +34,7 @@ import org.json4s._ import org.json4s.native.Serialization.read import org.json4s.native.Serialization.write + object ESEventsUtil { implicit val formats = DefaultFormats @@ -80,7 +88,7 @@ object ESEventsUtil { def eventToPut(event: Event, appId: Int): Map[String, Any] = { Map( - "eventId" -> event.eventId, + "eventId" -> event.eventId.getOrElse { getBase64UUID }, "event" -> event.event, "entityType" -> event.entityType, "entityId" -> event.entityId, @@ -94,4 +102,89 @@ object ESEventsUtil { ) } + val secureRandom: SecureRandom = new SecureRandom() + + val sequenceNumber: AtomicInteger = new AtomicInteger(secureRandom.nextInt()) + + val lastTimestamp: AtomicLong = new AtomicLong(0) + + val secureMungedAddress: Array[Byte] = { + val address = getMacAddress match { + case Some(x) => x + case None => + val dummy: Array[Byte] = new Array[Byte](6) + secureRandom.nextBytes(dummy) + dummy(0) = (dummy(0) | 0x01.toByte).toByte + dummy + } + + val mungedBytes: Array[Byte] = new Array[Byte](6) + secureRandom.nextBytes(mungedBytes) + for (i <- 0 until 6) { + mungedBytes(i) = (mungedBytes(i) ^ address(i)).toByte + } + + mungedBytes + } + + def getMacAddress(): Option[Array[Byte]] = { + try { + NetworkInterface.getNetworkInterfaces match { + case en if en == null => None + case en => + new Iterator[NetworkInterface] { + def next = en.nextElement + def hasNext = en.hasMoreElements + }.foldLeft(None: Option[Array[Byte]])((x, y) => + x match { + case None => + y.isLoopback match { + case true => + y.getHardwareAddress match { + case address if isValidAddress(address) => Some(address) + case _ => None + } + case false => None + } + case _ => x + }) + } + } catch { + case e: SocketException => None + } + } + + def isValidAddress(address: Array[Byte]): Boolean = { + address match { + case v if v == null || v.length != 6 => false + case v => v.exists(b => b != 0x00.toByte) + } + } + + def putLong(array: Array[Byte], l: Long, pos: Int, numberOfLongBytes: Int): Unit = { + for (i <- 0 until numberOfLongBytes) { + array(pos + numberOfLongBytes - i - 1) = (l >>> (i * 8)).toByte + } + } + + def getBase64UUID(): String = { + val sequenceId: Int = sequenceNumber.incrementAndGet & 0xffffff + val timestamp: Long = synchronized { + val t = Math.max(lastTimestamp.get, System.currentTimeMillis) + if (sequenceId == 0) { + lastTimestamp.set(t + 1) + } else { + lastTimestamp.set(t) + } + lastTimestamp.get + } + + val uuidBytes: Array[Byte] = new Array[Byte](15) + + putLong(uuidBytes, timestamp, 0, 6) + System.arraycopy(secureMungedAddress, 0, uuidBytes, 6, secureMungedAddress.length) + putLong(uuidBytes, sequenceId, 12, 3) + + Base64.getUrlEncoder().withoutPadding().encodeToString(uuidBytes) + } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala index 18e2fed..6cf7a9a 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESLEvents.scala @@ -45,8 +45,7 @@ import org.apache.http.entity.StringEntity class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: String) extends LEvents with Logging { implicit val formats = DefaultFormats.lossless ++ JodaTimeSerializers.all - private val seq = new ESSequences(client, config, index) - private val seqName = "events" + val restClient = client.open() def getEsType(appId: Int, channelId: Option[Int] = None): String = { channelId.map { ch => @@ -58,37 +57,31 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St override def init(appId: Int, channelId: Option[Int] = None): Boolean = { val estype = getEsType(appId, channelId) - val restClient = client.open() - try { - ESUtils.createIndex(restClient, index, - ESUtils.getNumberOfShards(config, index.toUpperCase), - ESUtils.getNumberOfReplicas(config, index.toUpperCase)) - val json = - (estype -> - ("_all" -> ("enabled" -> 0)) ~ - ("properties" -> - ("name" -> ("type" -> "keyword")) ~ - ("eventId" -> ("type" -> "keyword")) ~ - ("event" -> ("type" -> "keyword")) ~ - ("entityType" -> ("type" -> "keyword")) ~ - ("entityId" -> ("type" -> "keyword")) ~ - ("targetEntityType" -> ("type" -> "keyword")) ~ - ("targetEntityId" -> ("type" -> "keyword")) ~ - ("properties" -> ("enabled" -> 0)) ~ - ("eventTime" -> ("type" -> "date")) ~ - ("tags" -> ("type" -> "keyword")) ~ - ("prId" -> ("type" -> "keyword")) ~ - ("creationTime" -> ("type" -> "date")))) - ESUtils.createMapping(restClient, index, estype, compact(render(json))) - } finally { - restClient.close() - } + ESUtils.createIndex(restClient, index, + ESUtils.getNumberOfShards(config, index.toUpperCase), + ESUtils.getNumberOfReplicas(config, index.toUpperCase)) + val json = + (estype -> + ("_all" -> ("enabled" -> false)) ~ + ("properties" -> + ("name" -> ("type" -> "keyword")) ~ + ("eventId" -> ("type" -> "keyword")) ~ + ("event" -> ("type" -> "keyword")) ~ + ("entityType" -> ("type" -> "keyword")) ~ + ("entityId" -> ("type" -> "keyword")) ~ + ("targetEntityType" -> ("type" -> "keyword")) ~ + ("targetEntityId" -> ("type" -> "keyword")) ~ + ("properties" -> ("enabled" -> false)) ~ + ("eventTime" -> ("type" -> "date")) ~ + ("tags" -> ("type" -> "keyword")) ~ + ("prId" -> ("type" -> "keyword")) ~ + ("creationTime" -> ("type" -> "date")))) + ESUtils.createMapping(restClient, index, estype, compact(render(json))) true } override def remove(appId: Int, channelId: Option[Int] = None): Boolean = { val estype = getEsType(appId, channelId) - val restClient = client.open() try { val json = ("query" -> @@ -108,13 +101,11 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St case e: Exception => error(s"Failed to remove $index/$estype", e) false - } finally { - restClient.close() } } override def close(): Unit = { - // nothing + restClient.close() } override def futureInsert( @@ -123,12 +114,9 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St channelId: Option[Int])(implicit ec: ExecutionContext): Future[String] = { Future { val estype = getEsType(appId, channelId) - val restClient = client.open() try { val id = event.eventId.getOrElse { - var roll = seq.genNext(seqName) - while (exists(restClient, estype, roll)) roll = seq.genNext(seqName) - roll.toString + ESEventsUtil.getBase64UUID } val json = ("eventId" -> id) ~ @@ -161,8 +149,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St case e: IOException => error(s"Failed to update $index/$estype/<id>", e) "" - } finally { - restClient.close() } } } @@ -196,7 +182,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St channelId: Option[Int])(implicit ec: ExecutionContext): Future[Option[Event]] = { Future { val estype = getEsType(appId, channelId) - val restClient = client.open() try { val json = ("query" -> @@ -220,8 +205,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St case e: IOException => error("Failed to access to /$index/$estype/_search", e) None - } finally { - restClient.close() } } } @@ -232,7 +215,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St channelId: Option[Int])(implicit ec: ExecutionContext): Future[Boolean] = { Future { val estype = getEsType(appId, channelId) - val restClient = client.open() try { val json = ("query" -> @@ -255,8 +237,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St case e: IOException => error(s"Failed to update $index/$estype:$eventId", e) false - } finally { - restClient.close() } } } @@ -276,7 +256,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St (implicit ec: ExecutionContext): Future[Iterator[Event]] = { Future { val estype = getEsType(appId, channelId) - val restClient = client.open() try { val query = ESUtils.createEventQuery( startTime, untilTime, entityType, entityId, @@ -289,8 +268,6 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St case e: IOException => error(e.getMessage) Iterator[Event]() - } finally { - restClient.close() } } } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala index 8a6a047..9fd31a3 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESSequences.scala @@ -46,15 +46,15 @@ class ESSequences(client: ESClient, config: StorageClientConfig, index: String) ESUtils.getNumberOfReplicas(config, index.toUpperCase)) val mappingJson = (estype -> - ("_all" -> ("enabled" -> 0)) ~ + ("_all" -> ("enabled" -> false)) ~ ("properties" -> - ("n" -> ("enabled" -> 0)))) + ("n" -> ("enabled" -> false)))) ESUtils.createMapping(restClient, index, estype, compact(render(mappingJson))) } finally { restClient.close() } - def genNext(name: String): Int = { + def genNext(name: String): Long = { val restClient = client.open() try { val entity = new NStringEntity(write("n" -> name), ContentType.APPLICATION_JSON) @@ -67,9 +67,9 @@ class ESSequences(client: ESClient, config: StorageClientConfig, index: String) val result = (jsonResponse \ "result").extract[String] result match { case "created" => - (jsonResponse \ "_version").extract[Int] + (jsonResponse \ "_version").extract[Long] case "updated" => - (jsonResponse \ "_version").extract[Int] + (jsonResponse \ "_version").extract[Long] case _ => throw new IllegalStateException(s"[$result] Failed to update $index/$estype/$name") } http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/945139be/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala ---------------------------------------------------------------------- diff --git a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala index 34c76eb..2bf18ef 100644 --- a/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala +++ b/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESUtils.scala @@ -100,11 +100,12 @@ object ESUtils { query: String, size: Int)( implicit formats: Formats): Seq[JValue] = { + val entity = new NStringEntity(query, ContentType.APPLICATION_JSON) val response = client.performRequest( "POST", s"/$index/$estype/_search", Map("size" -> s"${size}"), - new StringEntity(query)) + entity) val responseJValue = parse(EntityUtils.toString(response.getEntity)) val hits = (responseJValue \ "hits" \ "hits").extract[Seq[JValue]] hits.map(h => (h \ "_source")) @@ -140,7 +141,7 @@ object ESUtils { if (hits.isEmpty) results else { val json = ("scroll" -> scrollLife) ~ ("scroll_id" -> scrollId) - val scrollBody = new StringEntity(compact(render(json))) + val scrollBody = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON) val response = client.performRequest( "POST", "/_search/scroll", @@ -153,11 +154,12 @@ object ESUtils { } } + val entity = new NStringEntity(query, ContentType.APPLICATION_JSON) val response = client.performRequest( "POST", s"/$index/$estype/_search", Map("scroll" -> scrollLife), - new StringEntity(query)) + entity) val responseJValue = parse(EntityUtils.toString(response.getEntity)) scroll((responseJValue \ "_scroll_id").extract[String], (responseJValue \ "hits" \ "hits").extract[Seq[JValue]],
